#**Big Data Project - BookCrossing** 📚

###**Preparing work environment**

In [1]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null 
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark 

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad.net/grap

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [3]:
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2")

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import Row
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, DecimalType, StringType

Connecting google drive to colab notebook

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Reading Data**

In [5]:
data_path = "/content/drive/MyDrive/TCBDA/Big Data/Big Data Project"
Bx_Users = data_path + "/" + "BX-Users.csv"
Bx_Books = data_path + "/" + "BX-Books.csv"
Bx_Ratings = data_path + "/" + "BX-Book-Ratings.csv"

**Creating Data Frames for each data file**

In [6]:
Dim_Users = spark.read.csv(Bx_Users, sep=";", header=True, inferSchema=True, encoding="ISO-8859-1")
Dim_Books = spark.read.csv(Bx_Books, sep=";", header=True, inferSchema=True, encoding="ISO-8859-1")

Fact_Ratings - the file containing several separators between fields, must be replaced before being imported.

In [7]:
def parsingInput(line):
    fileds_data = []

    fields = line.split(";")
    for field in fields:
      field = field.replace('"', '')
      field = field.replace(',', '')
      field = field.replace('\n', '')
      fileds_data.append(field)
    
    return fileds_data

columns = []
fileds_data = []

with open(Bx_Ratings, mode='r', encoding="ISO-8859-1") as file:
  columns = parsingInput(file.readline())
  other_lines = file.readlines()
file.close()

for line in other_lines:
  fileds_data.append(parsingInput(line))

Fact_Ratings = spark.createDataFrame(fileds_data, columns)

###**Data Cleansing**

**Dim_Users**

In [8]:
Dim_Users.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)



In [9]:
Dim_Users.show()

+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
|      3|moscow, yukon ter...|NULL|
|      4|porto, v.n.gaia, ...|  17|
|      5|farnborough, hant...|NULL|
|      6|santa monica, cal...|  61|
|      7| washington, dc, usa|NULL|
|      8|timmins, ontario,...|NULL|
|      9|germantown, tenne...|NULL|
|     10|albacete, wiscons...|  26|
|     11|melbourne, victor...|  14|
|     12|fort bragg, calif...|NULL|
|     13|barcelona, barcel...|  26|
|     14|mediapolis, iowa,...|NULL|
|     15|calgary, alberta,...|NULL|
|     16|albuquerque, new ...|NULL|
|     17|chesapeake, virgi...|NULL|
|     18|rio de janeiro, r...|  25|
|     19|           weston, ,|  14|
|     20|langhorne, pennsy...|  19|
+-------+--------------------+----+
only showing top 20 rows



In [10]:
Dim_Users.count()

278859

In [11]:
Dim_Users=Dim_Users.withColumnRenamed('User-ID','UserID')

Converting dataframe columns to lowercase

In [12]:
for c in Dim_Users.columns:
  Dim_Users=Dim_Users.select("*", f.lower(Dim_Users[c]))
  Dim_Users = Dim_Users.drop(c)
  Dim_Users = Dim_Users.withColumnRenamed('lower('+c+')', c)

Dim_Users.show()

+------+--------------------+----+
|UserID|            Location| Age|
+------+--------------------+----+
|     1|  nyc, new york, usa|null|
|     2|stockton, califor...|  18|
|     3|moscow, yukon ter...|null|
|     4|porto, v.n.gaia, ...|  17|
|     5|farnborough, hant...|null|
|     6|santa monica, cal...|  61|
|     7| washington, dc, usa|null|
|     8|timmins, ontario,...|null|
|     9|germantown, tenne...|null|
|    10|albacete, wiscons...|  26|
|    11|melbourne, victor...|  14|
|    12|fort bragg, calif...|null|
|    13|barcelona, barcel...|  26|
|    14|mediapolis, iowa,...|null|
|    15|calgary, alberta,...|null|
|    16|albuquerque, new ...|null|
|    17|chesapeake, virgi...|null|
|    18|rio de janeiro, r...|  25|
|    19|           weston, ,|  14|
|    20|langhorne, pennsy...|  19|
+------+--------------------+----+
only showing top 20 rows



Converting datatypes

In [13]:
Dim_Users = Dim_Users.withColumn('UserID', Dim_Users.UserID.cast(IntegerType()))
Dim_Users = Dim_Users.withColumn('Age', Dim_Users.Age.cast(DecimalType()))

Dim_Users.dtypes

[('UserID', 'int'), ('Location', 'string'), ('Age', 'decimal(10,0)')]

**Location column** - splitting the column to three columns: city, region, & country.<br>
Dropping the location column.

In [14]:
split_location = f.split(Dim_Users.Location, ',')
Dim_Users = Dim_Users.withColumn('UserCity', split_location.getItem(0))
Dim_Users = Dim_Users.withColumn('UserRegion', split_location.getItem(1))
Dim_Users = Dim_Users.withColumn('UserCountry', split_location.getItem(2))
Dim_Users = Dim_Users.drop('Location')

Cleaning punctuation in location columns

In [15]:
punc_regex = '''(^ |\!|\(|\)|-|\[|\]|\{|\}|;|:|,|'|"|\<|\>|\.|\/|\?|@|�|#|\$|%|\^|\&|\*|_|~|\d| $)'''

Dim_Users=Dim_Users.withColumn('UserCity', f.regexp_replace('UserCity', punc_regex, ''))
Dim_Users=Dim_Users.withColumn('UserRegion', f.regexp_replace('UserRegion', punc_regex, ''))
Dim_Users=Dim_Users.withColumn('UserCountry', f.regexp_replace('UserCountry', punc_regex, ''))

Finding cells with a null value and empty cells in all columns

In [16]:
Dim_Users.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Users.columns]).show()

+------+------+--------+----------+-----------+
|UserID|   Age|UserCity|UserRegion|UserCountry|
+------+------+--------+----------+-----------+
|     1|110763|     327|      4022|       4641|
+------+------+--------+----------+-----------+



Age Coulmn - handling unusable values for analysis


In [17]:
Dim_Users.describe(["Age"]).show()

+-------+-----------------+
|summary|              Age|
+-------+-----------------+
|  count|           168096|
|   mean|          34.7514|
| stddev|14.42809738245543|
|    min|                0|
|    max|              244|
+-------+-----------------+



Transforming the age column's unusable values

In [18]:
Dim_Users = Dim_Users.withColumn("Age", f.when((Dim_Users.Age > 100)|(Dim_Users.Age<10)|(Dim_Users["Age"].isNull()),-1).otherwise(Dim_Users.Age))

Verifying transformation

In [19]:
Dim_Users.registerTempTable("Users")
spark.sql('select * from Users where (Age is null or Age>100 or Age<10) and Age>-1').count()

0

In [20]:
Dim_Users.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Users.columns]).show()

+------+---+--------+----------+-----------+
|UserID|Age|UserCity|UserRegion|UserCountry|
+------+---+--------+----------+-----------+
|     1|  0|     327|      4022|       4641|
+------+---+--------+----------+-----------+



Detecting the single row with a null value in the UserID column as shown in the result.

In [21]:
Dim_Users.filter(f.col('UserID').contains('None') | \
                            f.col('UserID').contains('NULL') | \
                            (f.col('UserID') == '' ) | \
                            (f.col('UserID') == ' ' ) | \
                            f.col('UserID').isNull() | \
                            f.isnan('UserID')).show()

+------+---+--------+----------+-----------+
|UserID|Age|UserCity|UserRegion|UserCountry|
+------+---+--------+----------+-----------+
|  null| -1|    null|      null|       null|
+------+---+--------+----------+-----------+



Dropping the row due to the lack of data.

In [22]:
Dim_Users=Dim_Users.filter(f.col('UserID').isNotNull())

**UserCity column** - handling unusable values and empty cells for analysis

In [23]:
Dim_Users.sort(Dim_Users.UserCity.desc()).show()

+------+---+--------+---------------+-----------+
|UserID|Age|UserCity|     UserRegion|UserCountry|
+------+---+--------+---------------+-----------+
|184164| 30|   ýzmýr|             na|     turkey|
|268024| 27|   ýzmir|    connecticut|     turkey|
|278595| -1|   ýzmir|             na|     turkey|
|106234| 20|   ýzmir|             na|     turkey|
|188910| -1|   ýzmir|             na|     turkey|
|  6162| 25|   ýzmir|             na|     turkey|
| 19675| -1|ýstanbul|             na|     turkey|
| 82759| 25|ýstanbul|       ýstanbul|     turkey|
| 42853| -1|ýstanbul|             uk|     turkey|
|274464| 25|ýstanbul|       ýstanbul|     turkey|
| 14246| -1|ýstanbul|             na|     turkey|
|125152| -1|ýstanbul|             na|     turkey|
| 36095| -1|ýstanbul|             na|     turkey|
| 79393| -1|ýstanbul|        ontario|     turkey|
| 36670| -1|ýstanbul| marmara region|     turkey|
|218117| 25|ýstanbul|             na|     turkey|
|221792| -1|ýstanbul|        marmara|     turkey|


In [24]:
Dim_Users = Dim_Users.withColumn("UserCity", f.when(Dim_Users.UserCity=='na','unknown').otherwise(Dim_Users.UserCity))

In [25]:
Dim_Users = Dim_Users.withColumn("UserCity",f.when((f.col("UserCity") == '')|(f.col("UserCity") == ' ')|f.col("UserCity").contains('None')|\
                                                   (f.col("UserCity").contains('NULL'))|f.col("UserCity").isNull()|f.isnan("UserCity"),'unknown').otherwise(f.col("UserCity")))

Verifying that the replacement was performed completely

In [26]:
Dim_Users.filter(f.col('UserCity').like ("")|f.col('UserCity').like (" ")|\
                 (Dim_Users.UserCity=='na')| f.col("UserCity").contains('None')|f.col("UserCity").contains('NULL')|\
                 f.col("UserCity").isNull()|f.isnan("UserCity")).count()

0

**UserRegion column** - handling unusable values and empty cells for analysis

In [27]:
Dim_Users = Dim_Users.withColumn("UserRegion", f.when(Dim_Users.UserRegion=='na','unknown').otherwise(Dim_Users.UserRegion))

In [28]:
Dim_Users = Dim_Users.withColumn("UserRegion",f.when((f.col("UserRegion") == '')|(f.col("UserRegion") == ' ')|f.col("UserRegion").contains('None')|\
                                                   (f.col("UserRegion").contains('NULL'))|f.col("UserRegion").isNull()|f.isnan("UserRegion"),'unknown').otherwise(f.col("UserRegion")))

Verifying that the replacement was performed completely

In [29]:
Dim_Users.filter(f.col('UserRegion').like ("")|f.col('UserRegion').like (" ")|\
                 (Dim_Users.UserRegion=='na')| f.col("UserRegion").contains('None')|f.col("UserRegion").contains('NULL')|\
                 f.col("UserRegion").isNull()|f.isnan("UserRegion")).count()

0

**UserCountry column**

Downloading the "countries" file from GITHUB for data cleansing in the UserCountry column

In [30]:
! wget "https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv"

--2022-04-12 11:40:28--  https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20759 (20K) [text/plain]
Saving to: ‘all.csv.1’


2022-04-12 11:40:28 (14.2 MB/s) - ‘all.csv.1’ saved [20759/20759]



Converting the file into a dataframe

In [31]:
countries = spark.read.format("csv").option("inferSchema",True).option("header", True).option("encoding","ISO-8859-1").csv('/content/all.csv')
countries.show()

+-------------------+-------+-------+------------+-------------+--------+--------------------+-------------------+-----------+---------------+------------------------+
|               name|alpha-2|alpha-3|country-code|   iso_3166-2|  region|          sub-region|intermediate-region|region-code|sub-region-code|intermediate-region-code|
+-------------------+-------+-------+------------+-------------+--------+--------------------+-------------------+-----------+---------------+------------------------+
|        Afghanistan|     AF|    AFG|           4|ISO 3166-2:AF|    Asia|       Southern Asia|               null|        142|             34|                    null|
|     Ãland Islands|     AX|    ALA|         248|ISO 3166-2:AX|  Europe|     Northern Europe|               null|        150|            154|                    null|
|            Albania|     AL|    ALB|           8|ISO 3166-2:AL|  Europe|     Southern Europe|               null|        150|             39|                  

Dropping unnecessary columns

In [32]:
countries = countries.drop('country-code','iso_3166-2','region','sub-region','intermediate-region','region-code','sub-region-code','intermediate-region-code')
countries.show()

+-------------------+-------+-------+
|               name|alpha-2|alpha-3|
+-------------------+-------+-------+
|        Afghanistan|     AF|    AFG|
|     Ãland Islands|     AX|    ALA|
|            Albania|     AL|    ALB|
|            Algeria|     DZ|    DZA|
|     American Samoa|     AS|    ASM|
|            Andorra|     AD|    AND|
|             Angola|     AO|    AGO|
|           Anguilla|     AI|    AIA|
|         Antarctica|     AQ|    ATA|
|Antigua and Barbuda|     AG|    ATG|
|          Argentina|     AR|    ARG|
|            Armenia|     AM|    ARM|
|              Aruba|     AW|    ABW|
|          Australia|     AU|    AUS|
|            Austria|     AT|    AUT|
|         Azerbaijan|     AZ|    AZE|
|            Bahamas|     BS|    BHS|
|            Bahrain|     BH|    BHR|
|         Bangladesh|     BD|    BGD|
|           Barbados|     BB|    BRB|
+-------------------+-------+-------+
only showing top 20 rows



Converting countries' data frame columns to lowercase

In [33]:
for c in countries.columns:
  countries=countries.select("*", f.lower(countries[c]))
  countries = countries.drop(c)
  countries = countries.withColumnRenamed('lower('+c+')', c)

countries.show()

+-------------------+-------+-------+
|               name|alpha-2|alpha-3|
+-------------------+-------+-------+
|        afghanistan|     af|    afg|
|     ãland islands|     ax|    ala|
|            albania|     al|    alb|
|            algeria|     dz|    dza|
|     american samoa|     as|    asm|
|            andorra|     ad|    and|
|             angola|     ao|    ago|
|           anguilla|     ai|    aia|
|         antarctica|     aq|    ata|
|antigua and barbuda|     ag|    atg|
|          argentina|     ar|    arg|
|            armenia|     am|    arm|
|              aruba|     aw|    abw|
|          australia|     au|    aus|
|            austria|     at|    aut|
|         azerbaijan|     az|    aze|
|            bahamas|     bs|    bhs|
|            bahrain|     bh|    bhr|
|         bangladesh|     bd|    bgd|
|           barbados|     bb|    brb|
+-------------------+-------+-------+
only showing top 20 rows



Generating lists from countries' dataframe columns

In [34]:
countryname = countries.select('name').collect()
alpha2 = countries.select('alpha-2').collect()
alpha3 = countries.select('alpha-3').collect()

Generating a dictionary of correct countries, from the countries file, as the  keys with found countries, from Dim_Users, as the values

In [35]:
usercountries = Dim_Users.select('UserCountry').collect()

newCountries = {}
for i,country in enumerate(usercountries):
  if country in countryname:
    continue
  elif country in alpha2:
    index = alpha2.index(country)
    selectedCountry = countryname[index][0]
  elif country in alpha3:
    index = alpha3.index(country)
    selectedCountry = countryname[index][0]
  else: 
    selectedCountry = 'unknown'

  if (selectedCountry in newCountries.keys()):
    newCountries[selectedCountry].add(country[0])
  else:
    newCountries[selectedCountry] = { country[0] }


newCountries.keys()

dict_keys(['united states of america', 'unknown', 'namibia', 'germany', 'canada', 'panama', 'china', 'ukraine', 'ghana', 'azerbaijan', 'united kingdom of great britain and northern ireland', 'puerto rico', 'malaysia', 'indonesia', 'tunisia', 'kazakhstan', 'new zealand', 'india', 'bosnia and herzegovina', 'israel', 'netherlands', 'holy see'])

Updating the UserCountry column with the keys of the newCountries dictionary

In [36]:
condition = f.when(f.col('UserCountry').isNull(), 'unknown')

for country in newCountries.keys():
  optionalVariations = list(newCountries[country])
  condition = condition.when(f.col('UserCountry').isin(optionalVariations), country)

condition = condition.otherwise(f.col('UserCountry'))
Dim_Users = Dim_Users.withColumn('UserCountry', condition)


Dim_Users.show()


+------+---+--------------+---------------+--------------------+
|UserID|Age|      UserCity|     UserRegion|         UserCountry|
+------+---+--------------+---------------+--------------------+
|     1| -1|           nyc|       new york|united states of ...|
|     2| 18|      stockton|     california|united states of ...|
|     3| -1|        moscow|yukon territory|             unknown|
|     4| 17|         porto|         vngaia|            portugal|
|     5| -1|   farnborough|          hants|             unknown|
|     6| 61|  santa monica|     california|united states of ...|
|     7| -1|    washington|             dc|united states of ...|
|     8| -1|       timmins|        ontario|              canada|
|     9| -1|    germantown|      tennessee|united states of ...|
|    10| 26|      albacete|      wisconsin|               spain|
|    11| 14|     melbourne|       victoria|           australia|
|    12| -1|    fort bragg|     california|united states of ...|
|    13| 26|     barcelon

Finding the number of unknown countries as a result of the update

In [37]:
Dim_Users.filter(Dim_Users['UserCountry'].like ('unknown')).count()

26837

 <u> Verifying transformation </u> <br> Whether any empty cells remain in Dim_Users

In [39]:
Dim_Users.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Users.columns]).show()

+------+---+--------+----------+-----------+
|UserID|Age|UserCity|UserRegion|UserCountry|
+------+---+--------+----------+-----------+
|     0|  0|       0|         0|          0|
+------+---+--------+----------+-----------+



Checking duplicates

In [40]:
dup = Dim_Users.join(
    Dim_Users.groupBy(Dim_Users.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Dim_Users.columns,
    how="inner")

dup.select(f.sum("Duplicate_indicator")).show()

+------------------------+
|sum(Duplicate_indicator)|
+------------------------+
|                       0|
+------------------------+



**Dim_Books**

In [41]:
Dim_Books.printSchema()

root
 |-- ISBN: string (nullable = true)
 |-- Book-Title: string (nullable = true)
 |-- Book-Author: string (nullable = true)
 |-- Year-Of-Publication: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Image-URL-S: string (nullable = true)
 |-- Image-URL-M: string (nullable = true)
 |-- Image-URL-L: string (nullable = true)



In [42]:
Dim_Books.count()

271379

In [43]:
Dim_Books = Dim_Books.withColumnRenamed('Book-Title', 'BookTitle')
Dim_Books = Dim_Books.withColumnRenamed('Book-Author', 'BookAuthor')
Dim_Books = Dim_Books.withColumnRenamed('Year-Of-Publication', 'YearOfPublication')

Dropping irrelevant columns for analysis

In [44]:
Dim_Books = Dim_Books.drop('Image-URL-S','Image-URL-M','Image-URL-L')

Converting dataframe columns to lowercase

In [45]:
for c in Dim_Books.columns:
  Dim_Books = Dim_Books.select("*", f.lower(Dim_Books[c]))
  Dim_Books = Dim_Books.drop(c)
  Dim_Books = Dim_Books.withColumnRenamed('lower('+c+')', c)

Converting datatype to the Year-Of-Publication column, since it was changed when we converted to lowercase

In [46]:
Dim_Books = Dim_Books.withColumn('YearOfPublication', Dim_Books['YearOfPublication'].cast(IntegerType()))

Cleaning punctuation in columns with a string datatype (excluding ISBN)

In [47]:
punc_regex = '''(^ |\!|\(|\)|-|\[|\]|\{|\}|;|:|,|'|"|\<|\>|\.|\/|\?|@|�|#|\$|%|\^|\&|\*|_|~|\d| $)'''

Dim_Books=Dim_Books.withColumn('BookTitle', f.regexp_replace('BookTitle', punc_regex, ''))
Dim_Books=Dim_Books.withColumn('BookAuthor', f.regexp_replace('BookAuthor', punc_regex, ''))
Dim_Books=Dim_Books.withColumn('Publisher', f.regexp_replace('Publisher', punc_regex, ''))

In [48]:
Dim_Books.show()

+----------+--------------------+--------------------+-----------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|
+----------+--------------------+--------------------+-----------------+--------------------+
|0195153448| classical mythology|    mark p o morford|             2002|oxford university...|
|0002005018|        clara callan|richard bruce wright|             2001|harperflamingo ca...|
|0060973129|decision in normandy|         carlo deste|             1991|     harperperennial|
|0374157065|flu the story of ...|    gina bari kolata|             1999|farrar straus giroux|
|0393045218|the mummies of ur...|        e j w barber|             1999|w w norton amp co...|
|0399135782|the kitchen gods ...|             amy tan|             1991|    putnam pub group|
|0425176428|what if the world...|       robert cowley|             2000|berkley publishin...|
|0671870432|     pleading guilty|         scott turow|      

Finding cells with a null value and empty  cells

In [49]:
Dim_Books.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Books.columns]).show()

+----+---------+----------+-----------------+---------+
|ISBN|BookTitle|BookAuthor|YearOfPublication|Publisher|
+----+---------+----------+-----------------+---------+
|   0|       50|         0|                0|       56|
+----+---------+----------+-----------------+---------+



Book-Title column - handling with empty cells

In [50]:
Dim_Books = Dim_Books.withColumn("BookTitle",f.when((f.col("BookTitle") == '')|(f.col("BookTitle") == ' ')|f.col("BookTitle").contains('None')|\
                                                   (f.col("BookTitle").contains('NULL'))|f.col("BookTitle").isNull()|f.isnan("BookTitle"),'unknown').otherwise(f.col("BookTitle")))

Verifying that the replacement was performed completely

In [51]:
Dim_Books.filter(f.col('BookTitle').like ("")|f.col('BookTitle').like (" ")|\
                 f.col("BookTitle").contains('None')|f.col("BookTitle").contains('NULL')|\
                 f.col("BookTitle").isNull()|f.isnan("BookTitle")).count()

0

Publisher column - handling with empty cells

In [52]:
Dim_Books = Dim_Books.withColumn("Publisher",f.when((f.col("Publisher") == '')|(f.col("Publisher") == ' ')|f.col("Publisher").contains('None')|\
                                                   (f.col("Publisher").contains('NULL'))|f.col("Publisher").isNull()|f.isnan("Publisher"),'unknown').otherwise(f.col("Publisher")))

Verifying that the replacement was performed completely

In [53]:
Dim_Books.filter(f.col('Publisher').like ("")|f.col('Publisher').like (" ")|\
                 f.col("Publisher").contains('None')|f.col("Publisher").contains('NULL')|\
                 f.col("Publisher").isNull()|f.isnan("Publisher")).count()

0

Year-Of-Publication - handling unusable values for analysis

In [54]:
Dim_Books.describe(["YearOfPublication"]).show()

+-------+------------------+
|summary| YearOfPublication|
+-------+------------------+
|  count|            271379|
|   mean|1959.7560496574902|
| stddev| 258.0113625638109|
|    min|                 0|
|    max|              2050|
+-------+------------------+



Transforming the Year-Of-Publication column's unusable values

In [55]:
Dim_Books = Dim_Books.withColumn('YearOfPublication', f.when((Dim_Books['YearOfPublication'] > 2022)|(Dim_Books['YearOfPublication']==0),1900).otherwise(Dim_Books['YearOfPublication']))

In [56]:
Dim_Books.show()

+----------+--------------------+--------------------+-----------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|
+----------+--------------------+--------------------+-----------------+--------------------+
|0195153448| classical mythology|    mark p o morford|             2002|oxford university...|
|0002005018|        clara callan|richard bruce wright|             2001|harperflamingo ca...|
|0060973129|decision in normandy|         carlo deste|             1991|     harperperennial|
|0374157065|flu the story of ...|    gina bari kolata|             1999|farrar straus giroux|
|0393045218|the mummies of ur...|        e j w barber|             1999|w w norton amp co...|
|0399135782|the kitchen gods ...|             amy tan|             1991|    putnam pub group|
|0425176428|what if the world...|       robert cowley|             2000|berkley publishin...|
|0671870432|     pleading guilty|         scott turow|      

 <u> Verifying transformation </u> <br> Whether any empty cells remain in Dim_Books

In [57]:
Dim_Books.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Dim_Books.columns]).show()

+----+---------+----------+-----------------+---------+
|ISBN|BookTitle|BookAuthor|YearOfPublication|Publisher|
+----+---------+----------+-----------------+---------+
|   0|        0|         0|                0|        0|
+----+---------+----------+-----------------+---------+



Checking duplicates

In [58]:
dup = Dim_Books.join(
    Dim_Books.groupBy(Dim_Books.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Dim_Books.columns,
    how="inner")

dup.select(f.sum("Duplicate_indicator")).show()

+------------------------+
|sum(Duplicate_indicator)|
+------------------------+
|                     626|
+------------------------+



Dropping duplicates

In [59]:
Dim_Books=Dim_Books.join(
    Dim_Books.groupBy(Dim_Books.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Dim_Books.columns,
    how="inner")

In [60]:
Dim_Books=Dim_Books.filter(Dim_Books['Duplicate_indicator']==0)

In [61]:
Dim_Books = Dim_Books.drop('Duplicate_indicator')

In [62]:
Dim_Books.printSchema()

root
 |-- ISBN: string (nullable = true)
 |-- BookTitle: string (nullable = true)
 |-- BookAuthor: string (nullable = true)
 |-- YearOfPublication: integer (nullable = true)
 |-- Publisher: string (nullable = true)



**Fact_Ratings**

In [63]:
Fact_Ratings.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



In [64]:
Fact_Ratings = Fact_Ratings.withColumnRenamed('User-ID', 'UserID')
Fact_Ratings = Fact_Ratings.withColumnRenamed('Book-Rating', 'BookRating')

In [65]:
Fact_Ratings = Fact_Ratings.withColumn('UserID', Fact_Ratings['UserID'].cast(IntegerType()))
Fact_Ratings = Fact_Ratings.withColumn('BookRating', Fact_Ratings['BookRating'].cast(IntegerType()))

In [66]:
Fact_Ratings.count()

1048575

In [67]:
Fact_Ratings.show()

+------+----------+----------+
|UserID|      ISBN|BookRating|
+------+----------+----------+
|276725|034545104X|         0|
|276726|0155061224|         5|
|276727|0446520802|         0|
|276729|052165615X|         3|
|276729|0521795028|         6|
|276733|2080674722|         0|
|276736|3257224281|         8|
|276737|0600570967|         6|
|276744|038550120X|         7|
|276745| 342310538|        10|
|276746|0425115801|         0|
|276746|0449006522|         0|
|276746|0553561618|         0|
|276746|055356451X|         0|
|276746|0786013990|         0|
|276746|0786014512|         0|
|276747|0060517794|         9|
|276747|0451192001|         0|
|276747|0609801279|         0|
|276747|0671537458|         9|
+------+----------+----------+
only showing top 20 rows



Finding cells with a null value and empty cells

In [68]:
Fact_Ratings.select([f.count(f.when(f.col(c).contains('None') |\
                                 f.col(c).contains('NULL') | \
                                 (f.col(c) == '' ) | \
                                 (f.col(c) == ' ' ) | \
                                 f.col(c).isNull() | \
                                 f.isnan(c), c)).alias(c) for c in Fact_Ratings.columns]).show()

+------+----+----------+
|UserID|ISBN|BookRating|
+------+----+----------+
|     0|   0|         0|
+------+----+----------+



Sorting by UserID column

In [69]:
Fact_Ratings =Fact_Ratings.sort("UserID")
Fact_Ratings.show()

+------+----------+----------+
|UserID|      ISBN|BookRating|
+------+----------+----------+
|     2|0195153448|         0|
|     7| 034542252|         0|
|     8|1881320189|         7|
|     8|1575663937|         6|
|     8|0399135782|         0|
|     8|074322678X|         5|
|     8|0887841740|         5|
|     8|1567407781|         6|
|     8|0393045218|         0|
|     8|0771025661|         0|
|     8|0671870432|         0|
|     8|0374157065|         0|
|     8|0771074670|         0|
|     8|080652121X|         0|
|     8|1558746218|         0|
|     8|0002005018|         5|
|     8|0425176428|         0|
|     8|0060973129|         0|
|     8|1552041778|         5|
|     8|0679425608|         0|
+------+----------+----------+
only showing top 20 rows



Checking duplicates

In [70]:
dup = Fact_Ratings.join(
    Fact_Ratings.groupBy(Fact_Ratings.columns).agg((f.count("*")>1).cast(IntegerType()).alias("Duplicate_indicator")),
    on=Fact_Ratings.columns,
    how="inner")

dup.select(f.sum("Duplicate_indicator")).show()

+------------------------+
|sum(Duplicate_indicator)|
+------------------------+
|                       0|
+------------------------+



##**A brief summary of our data - Average Book Ratings**
(not including books that rated below 10 times)

In [71]:
#creating BookNames dict
BookNames = {row['ISBN']:row['BookTitle'] for row in Dim_Books.collect()}

In [72]:
# Creating 2 Lists from "BookNames" Dictionary
a = list(BookNames.keys())
b = list(BookNames.values())

# Creating one list of tuples from a,b lists
listOfTuples =  list(zip(a,b))


# Creating a list to help define the columns
columns = ['ISBN', 'BookTitle']

# Creating dataframe
df_BookNames = spark.createDataFrame(listOfTuples,columns)
df_BookNames.show()

+----------+--------------------+
|      ISBN|           BookTitle|
+----------+--------------------+
|0001837397|autumn story bram...|
|0001941968|little grey rabbi...|
|0006173454|power and the pro...|
|000638255x|the austrians a t...|
|0006475272|fresh girls and o...|
|0006530540|         blood royal|
|0006645097|a goodnight kind ...|
|0006713203|the demon bike rider|
|0007117019|lillian toos irre...|
|0020280505|how to stay alive...|
|0026715406|   working with wood|
|0028638263|the complete idio...|
|0030045541|the cave of the l...|
|0030206324|experimental orga...|
|0030635543|the time of the a...|
|0039282783|the miraculous hi...|
|0060094095|air battle force ...|
|006011875x|demian the story ...|
|0060145064|love and pasta a ...|
|0060164182|eisenhower a cent...|
+----------+--------------------+
only showing top 20 rows



In [75]:
# Computing average rating for each ISBN(Book ID)
averageRatings = Fact_Ratings.groupBy('ISBN').avg('BookRating')

# Computing count of ratings for each ISBN
counts = Fact_Ratings.groupBy('ISBN').count()

# Join the two together
averagesAndCounts = counts.join(averageRatings, 'ISBN')

# Filtering books rated 10 or fewer times 
popularAveragesAndCounts = averagesAndCounts.filter('count > 10')

# Join the results with book names, by ratings desc
BooksAvgRatings = popularAveragesAndCounts.join(df_BookNames,'ISBN').orderBy(popularAveragesAndCounts['avg(BookRating)'].desc())

BooksAvgRatings.show(truncate=False)

+----------+-----+------------------+-----------------------------------------------------------------------------------------------------------------+
|ISBN      |count|avg(BookRating)   |BookTitle                                                                                                        |
+----------+-----+------------------+-----------------------------------------------------------------------------------------------------------------+
|0091842050|11   |9.181818181818182 |the blue day book a lesson in cheering yourself up                                                               |
|0316779059|11   |9.090909090909092 |the baby book everything you need to know about your baby from birth to age two                                  |
|8478886451|13   |8.384615384615385 |harry potter y el cã¡liz de fuego                                                                                |
|0836213122|16   |8.375             |theres treasure everywherea calvin and hobbes colle

###**Generating CSV files for each dataframe**

Deleting any files in the "folderPath" to ensure the folder is empty

In [76]:
#creating a new folder for tables files
folderPath = '/content/drive/MyDrive/TCBDA/Big Data/Big Data Project/results'

In [77]:
import os
import glob

filesInFolder = folderPath + '/*.*'

fileList = glob.glob(filesInFolder)

for filePath in fileList:
    try:
        os.remove(filePath)
    except:
        print("Error while deleting file : ", filePath)


fileList = glob.glob(filesInFolder)

In [78]:
Fact_Ratings.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)
Dim_Users.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)
Dim_Books.coalesce(1).write.mode("append").option("header", "true").csv(folderPath)

In [79]:
spark.stop()