# Bid Data Analysis Project

# Data Description
Dataset for project collected by Cai-Nicolas Ziegler in a 4-week crawl (August / September 2004) from the Book-Crossing community with kind permission from Ron Hornbaker, CTO of Humankind Systems.

It contains 278,858 users (anonymized but with demographic information) providing 1,149,780 ratings (explicit / implicit) about 271,379 books.

The Book-Crossing dataset comprises 3 tables:
Books.csv
Ratings.csv
Users.csv

Further information and the dataset can be found at 
https://www.kaggle.com/datasets/somnambwl/bookcrossing-dataset

#Main Assignment
1. Provide data analysis using PySpark and Colab
2. Create the necessary mashups and transformation to be included in a Star Schema model as a sparkDataFrame's that will be the base for the analysis in Tableau
3. Create the code to export the DataFrame's to specific files (json/csv) on local PC

## First, install the enviorment

In [4]:
# 1) First: install Java, Spark and and run a local Spark session by just running this on Google Colab:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   # !apt-get --> install java
!wget -q https://downloads.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz  # !wget  --> download file from url
!tar xf spark-3.2.3-bin-hadoop3.2.tgz  # !tar --> like unzip 
!pip install -q findspark  # !pip  --> instal a package

# 2) Second: set the locations where Spark and Java are installed to let know Colab where to find it.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

# 3) Third: import spark libraries and use them
import findspark
findspark.init("spark-3.2.3-bin-hadoop3.2") 
from pyspark.sql import SparkSession
# Create the session 
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql import Row
from pyspark.sql import functions

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu focal InRelease
[33m0% [Connecting to security.ubuntu.com (185.125.190.36)] [Connecting to cloud.r-[0m                                                                               Get:2 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
[33m0% [2 InRelease 14.2 kB/114 kB 12%] [Connecting to security.ubuntu.com (185.125[0m                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
[33m0% [2 InRelease 44.6 kB/114 kB 39%] [Connecting to security.ubuntu.com (185.125[0m[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [[0m                                                                               Get:4 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
[33m0% [4 InRelease 11.3 kB/108 kB 10%] [Waiting for headers] [Waiting for headers][0m

In [5]:
# The session is basically our connection to Spark layer in the Hadoop ecosystem
spark = SparkSession.builder.appName('ops').getOrCreate()
spark = SparkSession.builder.appName('agg').getOrCreate()
from pyspark.sql.types import (StructField, IntegerType,StringType, StructType)
import pyspark.sql.functions 

<br>
<br>
<hr class="dashed">
<br>
<br>

## Establishing connection between "Colab" to "Google Drive"

In [6]:
# Code for connecting our google drive to this collab notebook
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


<br>
<br>
<hr class="dashed">
<br>
<br>

#Open and clean the Ratings CSV file

In [7]:
# Open the RATINGS CSV file 
RATINGS_DF = spark.read.options(header='True', delimiter=';',quote="").csv("/content/drive/MyDrive/BDA Technion/Big Data Project/BX-Book-Ratings.csv")
RATINGS_DF.show()

+--------+--------------+------------------+
|"User-ID|      ""ISBN""|""Book-Rating""",,|
+--------+--------------+------------------+
| "276725|""034545104X""|          ""0""",,|
| "276726|""0155061224""|          ""5""",,|
| "276727|""0446520802""|          ""0""",,|
| "276729|""052165615X""|          ""3""",,|
| "276729|""0521795028""|          ""6""",,|
| "276733|""2080674722""|          ""0""",,|
| "276736|""3257224281""|          ""8""",,|
| "276737|""0600570967""|          ""6""",,|
| "276744|""038550120X""|          ""7""",,|
| "276745| ""342310538""|         ""10""",,|
| "276746|""0425115801""|          ""0""",,|
| "276746|""0449006522""|          ""0""",,|
| "276746|""0553561618""|          ""0""",,|
| "276746|""055356451X""|          ""0""",,|
| "276746|""0786013990""|          ""0""",,|
| "276746|""0786014512""|          ""0""",,|
| "276747|""0060517794""|          ""9""",,|
| "276747|""0451192001""|          ""0""",,|
| "276747|""0609801279""|          ""0""",,|
| "276747|

In [8]:
# RATINGS Data cleaning
from pyspark.sql.functions  import *
RATINGS_DF=RATINGS_DF.withColumn('User-ID', regexp_replace('"User-ID', '"', '')).withColumn('ISBN', regexp_replace('""ISBN""', '"', '')).withColumn('Book-Rating', regexp_replace('""Book-Rating""",,', '"|,', '').cast('int'))
RATINGS_DF=RATINGS_DF.drop(RATINGS_DF['"User-ID'])
RATINGS_DF=RATINGS_DF.drop(RATINGS_DF['""ISBN""'])
RATINGS_DF=RATINGS_DF.drop(RATINGS_DF['""Book-Rating""",,'])
RATINGS_DF=RATINGS_DF.withColumn('User-ID', RATINGS_DF['User-ID'].cast('int'))
RATINGS_DF.show()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120X|          7|
| 276745| 342310538|         10|
| 276746|0425115801|          0|
| 276746|0449006522|          0|
| 276746|0553561618|          0|
| 276746|055356451X|          0|
| 276746|0786013990|          0|
| 276746|0786014512|          0|
| 276747|0060517794|          9|
| 276747|0451192001|          0|
| 276747|0609801279|          0|
| 276747|0671537458|          9|
+-------+----------+-----------+
only showing top 20 rows



In [9]:
RATINGS_DF.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: integer (nullable = true)



In [10]:
#Check for Null Vaules

from pyspark.sql.functions import isnan, when, count, col

RATINGS_DF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in RATINGS_DF.columns]).show()


+-------+----+-----------+
|User-ID|ISBN|Book-Rating|
+-------+----+-----------+
|      0|   0|          0|
+-------+----+-----------+



In [11]:
RATINGS_DF.groupBy("Book-Rating").count().orderBy(RATINGS_DF["Book-Rating"].asc()).show()

+-----------+------+
|Book-Rating| count|
+-----------+------+
|          0|651330|
|          1|  1601|
|          2|  2492|
|          3|  5445|
|          4|  8031|
|          5| 46953|
|          6| 33753|
|          7| 70018|
|          8| 95037|
|          9| 62101|
|         10| 71814|
+-----------+------+



In [12]:
#Check dublicates
RATINGS_DF_distinct= RATINGS_DF.distinct()
print("Distinct count: "+str(RATINGS_DF_distinct.count()))
print("ALL count: "+str(RATINGS_DF.count()))

Distinct count: 1048575
ALL count: 1048575


In [13]:
#Check how many unique users and books exist
RATINGS_DF.select(countDistinct('ISBN')).show()
RATINGS_DF.select(countDistinct('User-ID')).show()

+--------------------+
|count(DISTINCT ISBN)|
+--------------------+
|              323418|
+--------------------+

+-----------------------+
|count(DISTINCT User-ID)|
+-----------------------+
|                  95513|
+-----------------------+



In [None]:
#Save file in csv
RATINGS_DF.coalesce(1).write.option('header', 'True').csv("/content/drive/MyDrive/BDA Technion/Big Data Project/Ratings")

<br>
<br>
<hr class="dashed">
<br>
<br>

#Open and clean the Books CSV file

In [15]:
# Open the books details CSV file
BOOKS_DF = spark.read.options(header='True', delimiter=';').csv("/content/drive/MyDrive/BDA Technion/Big Data Project/BX-Books.csv")
BOOKS_DF.show()  

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

In [16]:
# BOOKS Data cleaning
BOOKS_DF=BOOKS_DF.drop(BOOKS_DF['Image-URL-S'])
BOOKS_DF=BOOKS_DF.drop(BOOKS_DF['Image-URL-M'])
BOOKS_DF=BOOKS_DF.drop(BOOKS_DF['Image-URL-L'])
BOOKS_DF.show()

+----------+--------------------+--------------------+-------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|
+----------+--------------------+--------------------+-------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|               1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|               1999|W. W. Norton &amp...|
|0399135782|The Kitchen God's...|             Amy Tan|               1991|    Putnam Pub Group|
|0425176428|What If?: The Wor...|       Robert Cowley|               2000|Berkley Publishin...|
|0671870432|     PLEADING GUILTY|       

In [17]:
BOOKS_DF = BOOKS_DF.withColumn("Year-Of-Publication",col("Year-Of-Publication").cast("int"))
BOOKS_DF.printSchema()
BOOKS_DF.groupBy("Year-Of-Publication").count().orderBy(BOOKS_DF["Year-Of-Publication"].desc()).show()

root
 |-- ISBN: string (nullable = true)
 |-- Book-Title: string (nullable = true)
 |-- Book-Author: string (nullable = true)
 |-- Year-Of-Publication: integer (nullable = true)
 |-- Publisher: string (nullable = true)

+-------------------+-----+
|Year-Of-Publication|count|
+-------------------+-----+
|               2050|    2|
|               2038|    1|
|               2037|    1|
|               2030|    7|
|               2026|    1|
|               2024|    1|
|               2021|    1|
|               2020|    3|
|               2012|    1|
|               2011|    2|
|               2010|    2|
|               2008|    1|
|               2006|    3|
|               2005|   46|
|               2004| 5839|
|               2003|14359|
|               2002|17628|
|               2001|17360|
|               2000|17235|
|               1999|17432|
+-------------------+-----+
only showing top 20 rows



In [18]:
#It looks like that some of the books Year-Of-Publication is after 2022 😯 Lets fix this
BOOKS_DF = BOOKS_DF.withColumn("Year-Of-Publication", when(BOOKS_DF['Year-Of-Publication'] > 2021,-1).otherwise(BOOKS_DF['Year-Of-Publication']))
BOOKS_DF.groupBy("Year-Of-Publication").count().orderBy(BOOKS_DF["Year-Of-Publication"].desc()).show()

+-------------------+-----+
|Year-Of-Publication|count|
+-------------------+-----+
|               2021|    1|
|               2020|    3|
|               2012|    1|
|               2011|    2|
|               2010|    2|
|               2008|    1|
|               2006|    3|
|               2005|   46|
|               2004| 5839|
|               2003|14359|
|               2002|17628|
|               2001|17360|
|               2000|17235|
|               1999|17432|
|               1998|15767|
|               1997|14892|
|               1996|14031|
|               1995|13548|
|               1994|11796|
|               1993|10602|
+-------------------+-----+
only showing top 20 rows



In [19]:
#Check for Null Vaules

BOOKS_DF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in BOOKS_DF.columns]).show()


+----+----------+-----------+-------------------+---------+
|ISBN|Book-Title|Book-Author|Year-Of-Publication|Publisher|
+----+----------+-----------+-------------------+---------+
|   0|         0|          0|                  0|        0|
+----+----------+-----------+-------------------+---------+



In [20]:
#Check dublicates
BOOKS_DF_distinct= BOOKS_DF.distinct()
print("Distinct count: "+str(BOOKS_DF_distinct.count()))
print("ALL count: "+str(BOOKS_DF.count()))

Distinct count: 271379
ALL count: 271379


In [21]:
#It seems like some ISBN don't exist in a book table
RATINGS_DF.select(countDistinct('ISBN').alias('QTY in Rating DF')).show()
BOOKS_DF.select(countDistinct('ISBN').alias('QTY inBOOK DF')).show()

+----------------+
|QTY in Rating DF|
+----------------+
|          323418|
+----------------+

+-------------+
|QTY inBOOK DF|
+-------------+
|       271379|
+-------------+



In [22]:
#We can fix this with anti join
Missing=RATINGS_DF.join(BOOKS_DF, [RATINGS_DF.ISBN == BOOKS_DF.ISBN], how='left_anti')
Missing.show()

+-------+-----------+-----------+
|User-ID|       ISBN|Book-Rating|
+-------+-----------+-----------+
| 276875| 880781112X|          6|
| 277087| 8471662531|          7|
| 278418| 0590468502|          0|
| 278418| 1566820413|          0|
|    183|   19973/88|          6|
|   2033| 0099428385|          8|
|   2349| 3442089611|          0|
|   2442| 887684760X|          0|
|   2442| 8879830678|          5|
|   2615| 0140258418|          7|
|   2807| 8425336880|          8|
|   3081| 9707100567|         10|
|   3363| 0471530050|          0|
|   3757| 8425336880|          6|
|   3757| 8432703518|          8|
|   3757| 8485224574|          7|
|   4207|84955010301|          0|
|   4571| 0143003011|          6|
|   5350| 3404118278|          0|
|   6703| 840132744X|          0|
+-------+-----------+-----------+
only showing top 20 rows



In [23]:
Missing=Missing.drop(Missing['User-ID'])
Missing=Missing.drop(Missing['Book-Rating'])
Missing=Missing.distinct()

In [24]:
Missing.count()

65594

In [25]:
BOOKS_DF = BOOKS_DF.unionByName(Missing, allowMissingColumns=True)

In [26]:
#We can see that there are 65595 new ISBN in Book_DF without any information
BOOKS_DF.select(countDistinct('ISBN').alias('QTY inBOOK DF')).show()
BOOKS_DF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in BOOKS_DF.columns]).show()

+-------------+
|QTY inBOOK DF|
+-------------+
|       336973|
+-------------+

+----+----------+-----------+-------------------+---------+
|ISBN|Book-Title|Book-Author|Year-Of-Publication|Publisher|
+----+----------+-----------+-------------------+---------+
|   0|     65594|      65594|              65594|    65594|
+----+----------+-----------+-------------------+---------+



In [27]:
BOOKS_DF.show()

+----------+--------------------+--------------------+-------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|
+----------+--------------------+--------------------+-------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|               1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|               1999|W. W. Norton &amp...|
|0399135782|The Kitchen God's...|             Amy Tan|               1991|    Putnam Pub Group|
|0425176428|What If?: The Wor...|       Robert Cowley|               2000|Berkley Publishin...|
|0671870432|     PLEADING GUILTY|       

In [None]:
#Save file in csv
BOOKS_DF.coalesce(1).write.option('header', 'True').csv("/content/drive/MyDrive/BDA Technion/Big Data Project/Books")

<br>
<br>
<hr class="dashed">
<br>
<br>

#Open and clean the Users CSV file

In [93]:
# Open the users details CSV file
USERS_DF = spark.read.options(header='True', delimiter=';').csv("/content/drive/MyDrive/BDA Technion/Big Data Project/BX-Users.csv")
USERS_DF.show()  

+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
|      3|moscow, yukon ter...|NULL|
|      4|porto, v.n.gaia, ...|  17|
|      5|farnborough, hant...|NULL|
|      6|santa monica, cal...|  61|
|      7| washington, dc, usa|NULL|
|      8|timmins, ontario,...|NULL|
|      9|germantown, tenne...|NULL|
|     10|albacete, wiscons...|  26|
|     11|melbourne, victor...|  14|
|     12|fort bragg, calif...|NULL|
|     13|barcelona, barcel...|  26|
|     14|mediapolis, iowa,...|NULL|
|     15|calgary, alberta,...|NULL|
|     16|albuquerque, new ...|NULL|
|     17|chesapeake, virgi...|NULL|
|     18|rio de janeiro, r...|  25|
|     19|           weston, ,|  14|
|     20|langhorne, pennsy...|  19|
+-------+--------------------+----+
only showing top 20 rows



In [94]:
#Split the location into 3 column
USERS_DF = USERS_DF.withColumn('City', split(USERS_DF['Location'], ',').getItem(0)) \
       .withColumn('State', split(USERS_DF['Location'], ',').getItem(1)) \
       .withColumn('Country', split(USERS_DF['Location'], ',').getItem(2))
USERS_DF=USERS_DF.drop(USERS_DF['Location']) 
USERS_DF = USERS_DF.withColumn("Country", lower(col("Country"))) 
USERS_DF = USERS_DF.withColumn('User-ID', USERS_DF['User-ID'].cast('int'))
USERS_DF = USERS_DF.withColumn('Age', USERS_DF.Age.cast('int'))
USERS_DF.show()

+-------+----+--------------+----------------+---------------+
|User-ID| Age|          City|           State|        Country|
+-------+----+--------------+----------------+---------------+
|      1|null|           nyc|        new york|            usa|
|      2|  18|      stockton|      california|            usa|
|      3|null|        moscow| yukon territory|         russia|
|      4|  17|         porto|        v.n.gaia|       portugal|
|      5|null|   farnborough|           hants| united kingdom|
|      6|  61|  santa monica|      california|            usa|
|      7|null|    washington|              dc|            usa|
|      8|null|       timmins|         ontario|         canada|
|      9|null|    germantown|       tennessee|            usa|
|     10|  26|      albacete|       wisconsin|          spain|
|     11|  14|     melbourne|        victoria|      australia|
|     12|null|    fort bragg|      california|            usa|
|     13|  26|     barcelona|       barcelona|         

In [95]:
#There are no dublicates
USERS_DF_distinct= USERS_DF.distinct()
print("Distinct count: "+str(USERS_DF_distinct.count()))
print("ALL count: "+str(USERS_DF.count()))

Distinct count: 278859
ALL count: 278859


In [96]:
#Check for Null Vaules
from pyspark.sql.functions import isnan, when, count, col
USERS_DF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in USERS_DF.columns]).show()

+-------+------+----+-----+-------+
|User-ID|   Age|City|State|Country|
+-------+------+----+-----+-------+
|      1|110763|   0|    3|      4|
+-------+------+----+-----+-------+



In [97]:
#Data preparation
USERS_DF=USERS_DF.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in USERS_DF.columns])

In [98]:
USERS_DF = USERS_DF.withColumn("State", when(USERS_DF['State'].endswith(' '),regexp_replace(USERS_DF['State'], ' ','unknown')).otherwise(USERS_DF['State']))
USERS_DF = USERS_DF.withColumn("Country", when(USERS_DF['Country'].endswith(' '),regexp_replace(USERS_DF['Country'], ' ','unknown')).otherwise(USERS_DF['Country']))
USERS_DF = USERS_DF.withColumn("City", when(USERS_DF['City'].endswith(' '),regexp_replace(USERS_DF['City'], ' ','unknown')).otherwise(USERS_DF['City']))

In [99]:
USERS_DF = USERS_DF.withColumn("State", when(USERS_DF['State'].endswith('n/a'),regexp_replace(USERS_DF['State'], 'n/a','unknown')).otherwise(USERS_DF['State']))
USERS_DF = USERS_DF.withColumn("Country", when(USERS_DF['Country'].endswith('n/a'),regexp_replace(USERS_DF['Country'], 'n/a','unknown')).otherwise(USERS_DF['Country']))
USERS_DF = USERS_DF.withColumn("City", when(USERS_DF['City'].endswith('n/a'),regexp_replace(USERS_DF['City'], 'n/a','unknown')).otherwise(USERS_DF['City']))

In [100]:
USERS_DF=USERS_DF.na.fill(value='unknown',subset=["Country"])
USERS_DF=USERS_DF.na.fill(value='unknown',subset=["State"])
USERS_DF=USERS_DF.na.fill(value='unknown',subset=["City"])

In [101]:
USERS_DF.show(20)

+-------+----+--------------+----------------+---------------+
|User-ID| Age|          City|           State|        Country|
+-------+----+--------------+----------------+---------------+
|      1|null|           nyc|        new york|            usa|
|      2|  18|      stockton|      california|            usa|
|      3|null|        moscow| yukon territory|         russia|
|      4|  17|         porto|        v.n.gaia|       portugal|
|      5|null|   farnborough|           hants| united kingdom|
|      6|  61|  santa monica|      california|            usa|
|      7|null|    washington|              dc|            usa|
|      8|null|       timmins|         ontario|         canada|
|      9|null|    germantown|       tennessee|            usa|
|     10|  26|      albacete|       wisconsin|          spain|
|     11|  14|     melbourne|        victoria|      australia|
|     12|null|    fort bragg|      california|            usa|
|     13|  26|     barcelona|       barcelona|         

In [102]:
USERS_DF.printSchema()
USERS_DF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in USERS_DF.columns]).show()

root
 |-- User-ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = false)
 |-- State: string (nullable = false)
 |-- Country: string (nullable = false)

+-------+------+----+-----+-------+
|User-ID|   Age|City|State|Country|
+-------+------+----+-----+-------+
|      1|110763|   0|    0|      0|
+-------+------+----+-----+-------+



In [103]:
#We can see a lot of Age NULL VALUES! Lets fix this
USERS_DF=USERS_DF.na.fill(value=-1,subset=["Age"])
USERS_DF.show()

+-------+---+--------------+----------------+---------------+
|User-ID|Age|          City|           State|        Country|
+-------+---+--------------+----------------+---------------+
|      1| -1|           nyc|        new york|            usa|
|      2| 18|      stockton|      california|            usa|
|      3| -1|        moscow| yukon territory|         russia|
|      4| 17|         porto|        v.n.gaia|       portugal|
|      5| -1|   farnborough|           hants| united kingdom|
|      6| 61|  santa monica|      california|            usa|
|      7| -1|    washington|              dc|            usa|
|      8| -1|       timmins|         ontario|         canada|
|      9| -1|    germantown|       tennessee|            usa|
|     10| 26|      albacete|       wisconsin|          spain|
|     11| 14|     melbourne|        victoria|      australia|
|     12| -1|    fort bragg|      california|            usa|
|     13| 26|     barcelona|       barcelona|          spain|
|     14

In [104]:
UsersGroup = USERS_DF.groupBy("Age").count()
UsersGroup.orderBy(UsersGroup["Age"].desc()).show(10)
UsersGroup.orderBy(UsersGroup["Age"].asc()).show(10)

+---+-----+
|Age|count|
+---+-----+
|244|    1|
|239|    1|
|237|    1|
|231|    1|
|230|    1|
|229|    1|
|228|    3|
|226|    1|
|223|    1|
|220|    1|
+---+-----+
only showing top 10 rows

+---+------+
|Age| count|
+---+------+
| -1|110763|
|  0|   416|
|  1|   288|
|  2|   105|
|  3|    45|
|  4|    28|
|  5|    26|
|  6|    18|
|  7|    27|
|  8|    54|
+---+------+
only showing top 10 rows



In [105]:
# Let's fix age less then 7 and more then 100
USERS_DF = USERS_DF.withColumn("Age", when(USERS_DF['Age'] > 100,-1) 
      .when(USERS_DF['Age'] < 7,-1) \
      .otherwise(USERS_DF['Age']))
UsersGroup = USERS_DF.groupBy("Age").count()
UsersGroup.orderBy(UsersGroup["Age"].desc()).show(10)
UsersGroup.orderBy(UsersGroup["Age"].asc()).show(10)

+---+-----+
|Age|count|
+---+-----+
|100|    7|
| 99|    4|
| 98|    2|
| 97|    3|
| 96|    3|
| 95|    1|
| 94|    5|
| 93|   15|
| 92|   11|
| 91|   13|
+---+-----+
only showing top 10 rows

+---+------+
|Age| count|
+---+------+
| -1|112055|
|  7|    27|
|  8|    54|
|  9|    62|
| 10|    84|
| 11|   121|
| 12|   192|
| 13|   885|
| 14|  1962|
| 15|  2383|
+---+------+
only showing top 10 rows



In [106]:
#Cleaning of NULL value UsersID
USERS_DF.filter(col('User-ID').isNull()).show()
USERS_DF=USERS_DF.na.drop(subset=["User-ID"]) 
USERS_DF.filter(col('User-ID').isNull()).show()

+-------+---+----+-------+-------+
|User-ID|Age|City|  State|Country|
+-------+---+----+-------+-------+
|   null| -1|NULL|unknown|unknown|
+-------+---+----+-------+-------+

+-------+---+----+-----+-------+
|User-ID|Age|City|State|Country|
+-------+---+----+-----+-------+
+-------+---+----+-----+-------+



In [107]:
#There aren't Null values in the DataFrame anymore
USERS_DF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in USERS_DF.columns]).show()
USERS_DF.count()

+-------+---+----+-----+-------+
|User-ID|Age|City|State|Country|
+-------+---+----+-----+-------+
|      0|  0|   0|    0|      0|
+-------+---+----+-----+-------+



278858

In [108]:
pip install pycountry

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [109]:
import pycountry
pycountry.countries.search_fuzzy('italy')

[Country(alpha_2='IT', alpha_3='ITA', flag='🇮🇹', name='Italy', numeric='380', official_name='Italian Republic')]

In [110]:
pycountry.countries.search_fuzzy('london')

[Country(alpha_2='GB', alpha_3='GBR', flag='🇬🇧', name='United Kingdom', numeric='826', official_name='United Kingdom of Great Britain and Northern Ireland')]

In [111]:
def convert(location):   # define function here
  try:
    if location:
      lc=pycountry.countries.search_fuzzy(location)
      return lc[0].alpha_3
    else: 
      return None
  except LookupError:
    return None  

cc_udf = udf(convert, StringType()) #register udf

In [112]:
#Create new table that is grouped by countries and ordered by count
UsersCountryGroup = USERS_DF.groupBy("Country").count()
UsersCountryGroup=UsersCountryGroup.orderBy(UsersCountryGroup["count"].desc())
UsersCountryGroup.show(20)

+---------------+------+
|        Country| count|
+---------------+------+
|            usa|139188|
|         canada| 21558|
| united kingdom| 18304|
|        germany| 17052|
|          spain| 13205|
|      australia| 11724|
|          italy| 11246|
|        unknown|  4625|
|         france|  3474|
|       portugal|  3371|
|    new zealand|  3094|
|    netherlands|  3043|
|    switzerland|  1757|
|         brazil|  1677|
|          china|  1464|
|         sweden|  1452|
|          india|  1279|
|        austria|  1147|
|       malaysia|  1102|
|      argentina|  1081|
+---------------+------+
only showing top 20 rows



In [113]:
#Let's use function to find country code
UsersCountryGroup = UsersCountryGroup.withColumn("countries_formatted",cc_udf(UsersCountryGroup.Country))
UsersCountryGroup.show(20)

+---------------+------+-------------------+
|        Country| count|countries_formatted|
+---------------+------+-------------------+
|            usa|139188|                USA|
|         canada| 21558|                CAN|
| united kingdom| 18304|                GBR|
|        germany| 17052|                DEU|
|          spain| 13205|                ESP|
|      australia| 11724|                AUS|
|          italy| 11246|                ITA|
|        unknown|  4625|               null|
|         france|  3474|                FRA|
|       portugal|  3371|                PRT|
|    new zealand|  3094|                NZL|
|    netherlands|  3043|                NLD|
|    switzerland|  1757|                CHE|
|         brazil|  1677|                BRA|
|          china|  1464|                CHN|
|         sweden|  1452|                SWE|
|          india|  1279|                IND|
|        austria|  1147|                AUT|
|       malaysia|  1102|                MYS|
|      arg

In [114]:
#Rename column to make a join
UsersCountryGroup =UsersCountryGroup.withColumnRenamed("Country","Distinct_Country")
UsersCountryGroup.show()

+----------------+------+-------------------+
|Distinct_Country| count|countries_formatted|
+----------------+------+-------------------+
|             usa|139188|                USA|
|          canada| 21558|                CAN|
|  united kingdom| 18304|                GBR|
|         germany| 17052|                DEU|
|           spain| 13205|                ESP|
|       australia| 11724|                AUS|
|           italy| 11246|                ITA|
|         unknown|  4625|               null|
|          france|  3474|                FRA|
|        portugal|  3371|                PRT|
|     new zealand|  3094|                NZL|
|     netherlands|  3043|                NLD|
|     switzerland|  1757|                CHE|
|          brazil|  1677|                BRA|
|           china|  1464|                CHN|
|          sweden|  1452|                SWE|
|           india|  1279|                IND|
|         austria|  1147|                AUT|
|        malaysia|  1102|         

In [115]:
#Join new table with our Users_DF
USERS_DF=USERS_DF.join(UsersCountryGroup, USERS_DF["Country"] == UsersCountryGroup["Distinct_Country"],"left")
USERS_DF.show(20)

+-------+---+--------------+----------------+---------------+----------------+------+-------------------+
|User-ID|Age|          City|           State|        Country|Distinct_Country| count|countries_formatted|
+-------+---+--------------+----------------+---------------+----------------+------+-------------------+
|      1| -1|           nyc|        new york|            usa|             usa|139188|                USA|
|      2| 18|      stockton|      california|            usa|             usa|139188|                USA|
|      3| -1|        moscow| yukon territory|         russia|          russia|   177|                RUS|
|      4| 17|         porto|        v.n.gaia|       portugal|        portugal|  3371|                PRT|
|      5| -1|   farnborough|           hants| united kingdom|  united kingdom| 18304|                GBR|
|      6| 61|  santa monica|      california|            usa|             usa|139188|                USA|
|      7| -1|    washington|              dc| 

In [116]:
#Cleaning table from columns duplicates
USERS_DF=USERS_DF.drop(USERS_DF['Distinct_Country']) 
USERS_DF=USERS_DF.drop(USERS_DF['count']) 

In [117]:
#Creating another table with rows where we couldn't identify country 
Users_NULL_Countries=USERS_DF.where(col("countries_formatted").isNull())
Users_NULL_Countries=Users_NULL_Countries.drop(Users_NULL_Countries['countries_formatted']) 
#This time we group new table by City values
Users_by_city=Users_NULL_Countries.groupBy("City").count() 
Users_by_city=Users_by_city.orderBy(Users_by_city["count"].desc())
Users_by_city.show()

+------------+-----+
|        City|count|
+------------+-----+
|     unknown|  100|
|      milano|   59|
|     toronto|   59|
|   vancouver|   58|
|      london|   51|
|    belgrade|   50|
|        roma|   48|
|    portland|   45|
|     seattle|   38|
|      ottawa|   37|
|   barcelona|   33|
|        rome|   32|
|   san diego|   29|
|christchurch|   29|
|     chicago|   27|
|     houston|   27|
| albuquerque|   26|
|      austin|   25|
|      madrid|   25|
|    novi sad|   24|
+------------+-----+
only showing top 20 rows



In [118]:
#Let's use function to find country code by City name
Users_by_city= Users_by_city.withColumn("countries_formatted",cc_udf(Users_by_city.City))
Users_by_city.show()

+------------+-----+-------------------+
|        City|count|countries_formatted|
+------------+-----+-------------------+
|     unknown|  100|               null|
|     toronto|   59|               null|
|      milano|   59|                ITA|
|   vancouver|   58|               null|
|      london|   51|                GBR|
|    belgrade|   50|               null|
|        roma|   48|                ITA|
|    portland|   45|                JAM|
|     seattle|   38|               null|
|      ottawa|   37|               null|
|   barcelona|   33|                ESP|
|        rome|   32|                CZE|
|   san diego|   29|               null|
|christchurch|   29|                GBR|
|     chicago|   27|               null|
|     houston|   27|               null|
| albuquerque|   26|               null|
|      austin|   25|               null|
|      madrid|   25|                ESP|
|    novi sad|   24|               null|
+------------+-----+-------------------+
only showing top

In [119]:
#Cleaning table from columns duplicates and join with Country code sort by City
Users_by_city=Users_by_city.withColumnRenamed("City","Distinct_City")
Users_NULL_Countries=USERS_DF.where(col("countries_formatted").isNull())
Users_NULL_Countries=Users_NULL_Countries.drop(Users_NULL_Countries['countries_formatted']) 
Users_NULL_Countries=Users_NULL_Countries.join(Users_by_city, Users_NULL_Countries["City"] == Users_by_city["Distinct_City"],"left")
Users_NULL_Countries=Users_NULL_Countries.drop(Users_NULL_Countries['Distinct_City']) 
Users_NULL_Countries=Users_NULL_Countries.drop(Users_NULL_Countries['count']) 
Users_NULL_Countries.show()

+-------+---+-----------------+----------------+------------------+-------------------+
|User-ID|Age|             City|           State|           Country|countries_formatted|
+-------+---+-----------------+----------------+------------------+-------------------+
|    955| -1|       harrisburg|         unknown|           unknown|               null|
|    863| 73|            space|           space| somewherein space|               null|
|    537| -1|            davis|      california|           unknown|               null|
|   1202| 43|           bailey|         unknown|           unknown|               null|
|    318| -1|           berlin|         unknown|           unknown|                DEU|
|    884| -1|           berlin|         germany|           unknown|                DEU|
|   1065| -1|     san fernando|         unknown|           unknown|                TTO|
|    308| 53|            getxo| yukon territory|            espa�a|               null|
|    974| 36|        waterford| 

In [120]:
#Union 2 tables: with Country code sorted by City and with Country code sorted by Country
Users_Countries=USERS_DF.na.drop()
UNION_USERS = Users_Countries.union(Users_NULL_Countries)

In [None]:
#Save file in csv
UNION_USERS.coalesce(1).write.option('header', 'True').csv("/content/drive/MyDrive/BDA Technion/Big Data Project/Users")

In [121]:
#Chech how many null value we still have
UNION_USERS.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in UNION_USERS.columns]).show()

+-------+---+----+-----+-------+-------------------+
|User-ID|Age|City|State|Country|countries_formatted|
+-------+---+----+-----+-------+-------------------+
|      0|  0|   0|    0|      0|               4451|
+-------+---+----+-----+-------+-------------------+



In [122]:
#Check that we don't loose any row
UNION_USERS.count()

278858

# Stop Session

In [124]:
# Stop the session 
spark.stop()