In [31]:
import boto3
import os
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import countDistinct, explode, split, concat_ws, collect_list, isnan
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('ggplot')

# STEP 1: Get the params of the created redshift cluster 
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['KEY']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['SECRET']

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [16]:
# FILL IN THE REDSHIFT ENPOINT HERE
# e.g. DWH_ENDPOINT="redshift-cluster-1.csmamz5zxmle.us-west-2.redshift.amazonaws.com" 
DWH_ENDPOINT="dwhcluster.ci2m6m74tbzm.us-west-2.redshift.amazonaws.com" 
    
#FILL IN THE IAM ROLE ARN you got in step 2.2 of the previous exercise
#e.g DWH_ROLE_ARN="arn:aws:iam::988332130976:role/dwhRole"
DWH_ROLE_ARN="arn:aws:iam::264680862608:role/dwhRole"

# Step 2: Explore and Assess the Data

### Part 1: Load Data from S3 and clean dataframe

In [17]:
%load_ext sql

In [18]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.ci2m6m74tbzm.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [19]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

s3bucket =  s3.Bucket("udacity-input") # private

s3_data = iter(s3bucket.objects.filter(Prefix="ml-latest-small"))
for _ in range(5): print(next(s3_data))


s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/Awards.txt')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/links.csv')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/movies.csv')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/ratings.csv')


In [20]:
movieSchema = R([
            Fld("movieId",Int()),
            Fld("title",Str()),
            Fld("genres",Str())
            ])

In [7]:
ratingSchema = R([
            Fld("userId",Int()),
            Fld("movieId",Int()),
            Fld("rating",Dbl()),
            Fld("ts",Str())
            ])

In [8]:
tagSchema = R([
            Fld("userId",Int()),
            Fld("movieId",Int()),
            Fld("tag",Str()),
            Fld("ts",Str())
            ])

In [21]:
# read movies, ratings, and tags csv
dfmovies = spark.read.csv("s3a://udacity-input/ml-latest-small/movies.csv", header=True, schema=movieSchema)
dfratings = spark.read.csv("s3a://udacity-input/ml-latest-small/ratings.csv", header = True, schema=ratingSchema)
dftags = spark.read.csv("s3a://udacity-input/ml-latest-small/tags.csv", header = True, schema=tagSchema)

In [120]:
# read awards txt
dfawards = spark.read.option("header", "true") \
    .option("delimiter", "|") \
    .option("inferSchema", "true") \
    .csv("s3a://udacity-input/ml-latest-small/Awards.txt")

dfawards.show(10, truncate=False)

+--------------------------------------------------------+--------+----------+-----------+
|Film                                                    |Year    |Awards    |Nominations|
+--------------------------------------------------------+--------+----------+-----------+
|Parasite                                                |2019    |4         |6.0        |
|Ford v Ferrari                                          |2019    |2         |4.0        |
|Learning to Skateboard in a Warzone (If You're a Girl)  |2019    |1         |1.0        |
|The Neighbors' Window                                   |2019    |1         |1.0        |
|Little Women                                            |2019    |1         |6.0        |
|Marriage Story                                          |2019    |1         |6.0        |
|Jojo Rabbit                                             |2019    |1         |6.0        |
|Toy Story 4                                             |2019    |1         |2.0        |

In [None]:
# read award_corrected txt
dfawards2 = spark.read.option("header", "true") \
    .option("delimiter", "|") \
    .option("inferSchema", "true") \
    .csv("s3a://udacity-input/ml-latest-small/Award_corrected.txt")

dfawards2.show(10, truncate=False)

In [155]:
dfmovies.printSchema()
dfmovies.show(5, truncate = False)
dfmovies.count()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



9742

In [147]:
dfratings = dfratings.withColumn(
    "rate_time",
    F.to_timestamp(F.from_unixtime((col("ts") / 1000) , 'yyyy-MM-dd HH:mm:ss.SSS')).cast("Timestamp")
).drop("ts")

In [148]:
dfratings.printSchema()
dfratings.show(5)
dfratings.count()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- rate_time: timestamp (nullable = true)

+------+-------+------+-------------------+
|userId|movieId|rating|          rate_time|
+------+-------+------+-------------------+
|     1|      1|   4.0|1970-01-12 04:03:02|
|     1|      3|   4.0|1970-01-12 04:03:01|
|     1|      6|   4.0|1970-01-12 04:03:02|
|     1|     47|   5.0|1970-01-12 04:03:03|
|     1|     50|   5.0|1970-01-12 04:03:02|
+------+-------+------+-------------------+
only showing top 5 rows



100836

In [153]:
dftags = dftags.withColumn("tag_time", F.to_timestamp(col("ts") / 1000)).drop("ts")

In [154]:
dftags.printSchema()
dftags.show(5)
dftags.count()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- tag_time: timestamp (nullable = true)

+------+-------+---------------+--------------------+
|userId|movieId|            tag|            tag_time|
+------+-------+---------------+--------------------+
|     2|  60756|          funny|1970-01-17 17:35:...|
|     2|  60756|Highly quotable|1970-01-17 17:35:...|
|     2|  60756|   will ferrell|1970-01-17 17:35:...|
|     2|  89774|   Boxing story|1970-01-17 17:35:...|
|     2|  89774|            MMA|1970-01-17 17:35:...|
+------+-------+---------------+--------------------+
only showing top 5 rows



3683

In [121]:
dfawards.columns

['Film   ', 'Year   ', 'Awards    ', 'Nominations']

In [122]:
dfawards = dfawards.withColumn("title", dfawards['Film   '].cast(Str())).drop('Film   ')
dfawards = dfawards.withColumn("year", dfawards['Year   '].cast(Date())).drop("Year   ")
dfawards = dfawards.withColumn("awards", dfawards['Awards    '].cast(Dbl())).drop("Awards    ")
dfawards = dfawards.withColumn("nominations", dfawards['Nominations'].cast(Int()))
dfawards = dfawards.withColumn("year", F.year("year"))

In [134]:
dfawards2 = dfawards2.withColumn("title", dfawards2['Film   '].cast(Str())).drop('Film   ')
dfawards2 = dfawards2.withColumn("year", dfawards2['Year   '].cast(Date())).drop("Year   ")
dfawards2 = dfawards2.withColumn("awards", dfawards2['Awards    '].cast(Dbl())).drop("Awards    ")
dfawards2 = dfawards2.withColumn("nominations", dfawards2['Nominations'].cast(Int()))
dfawards2 = dfawards2.withColumn("year", F.year("year"))

In [135]:
dfawards.printSchema()
dfawards.show(5)
dfawards.count()
dfawards2.count()

root
 |-- nominations: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- awards: double (nullable = true)

+-----------+--------------------+----+------+
|nominations|               title|year|awards|
+-----------+--------------------+----+------+
|          6|        Parasite    |2019|   4.0|
|          4|    Ford v Ferrari  |2019|   2.0|
|          1|Learning to Skate...|2019|   1.0|
|          1|The Neighbors' Wi...|2019|   1.0|
|          6|    Little Women    |2019|   1.0|
+-----------+--------------------+----+------+
only showing top 5 rows



1316

In [136]:
dfawards.printSchema()
dfawards.show(5)
dfawards.count()

root
 |-- nominations: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- awards: double (nullable = true)

+-----------+--------------------+----+------+
|nominations|               title|year|awards|
+-----------+--------------------+----+------+
|          6|        Parasite    |2019|   4.0|
|          4|    Ford v Ferrari  |2019|   2.0|
|          1|Learning to Skate...|2019|   1.0|
|          1|The Neighbors' Wi...|2019|   1.0|
|          6|    Little Women    |2019|   1.0|
+-----------+--------------------+----+------+
only showing top 5 rows



1316

### Part 2: Explore the Data

### Data Wrangling with DataFrames
Identify data quality issues, like missing values, duplicate data, etc.

In [None]:
# check for null values
dfmovies.columns.value_counts()

#### How to deal with null values
#### 1. Deleting Rows 
This method commonly used to handle the null values. Here, we either delete a particular row if it has a null value for a particular feature and a particular column if it has more than 70-75% of missing values. This method is advised only when there are enough samples in the data set. 
#### 2. Replacing With Mean/Median/Mode  
This strategy can be applied on a feature which has numeric data like the age of a person or the rating score. We can calculate the mean, median or mode of the feature and replace it with the missing values. This is an approximation which can add variance to the data set. 
#### 3. Assigning An Unique Category  
A categorical feature will have a definite number of possibilities, such as gender, for example. Since they have a definite number of classes, we can assign another class for the missing values like unknown.
#### 4. Predicting The Missing Values  
Using the features which do not have missing values, we can predict the nulls with the help of a machine learning algorithm. 
#### 5. Using Algorithms Which Support Missing Values  
KNN is a machine learning algorithm which works on the principle of distance measure. This algorithm can be used when there are nulls present in the dataset. While the algorithm is applied, KNN considers the missing values by taking the majority of the K nearest values. 

In [None]:
# check duplicate data, also confirm the dataset is on which level
dfmovies.shape
dfmovies[['movieId']].drop_duplicates().shape

In [None]:
dfratings.shape
dfratings[['movieId', 'userId']].drop_duplicates().shape

In [None]:
dfawards.shape
dfawards[['movieId', 'year']].drop_duplicates().shape

In [None]:
# basic count
# number of movies in the dataset
distinct_movie = dfmovies['movieId'].nunique()
print('{} movies in the movies dataset'.format(distinct_movie))

In [None]:
# number of users in the dataset
distinct_user = dfratings['userId'].nunique()
print('{} users rated the movies'.format(distinct_user))

In [None]:
# number of movies receiving awards
distinct_award = dfawards['title'].nunique()
print('{} movies received awards'.format(distinct_award))

In [None]:
# Minimum number of ratings per user
# Minimum number of ratings per movie 
tmp1 = dfratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = dfratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [None]:
# number of movies rated by only one user
tmp1 = sum(dfratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = dfratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

In [None]:
# check the dates in the ratings dataset
tmp1 = dfratings['rate_time'].min()
tmp2 = dfratings['rate_time'].max()
print('ratings were made during {} and {}'.format(tmp1, tmp2))

In [137]:
# split the mixed genres by '|'
dfmovies2 = dfmovies.withColumn('genre', explode(split(dfmovies.genres, '\|')))

In [None]:
dfmovies2.show(11)

In [139]:
# use the dataframe dfmovies2 produced to match every movie to a single genre
genre_movies = dfmovies2 \
                    .groupBy(dfmovies2.genre) \
                    .agg(concat_ws(',', collect_list(dfmovies2.title)) \
                    .alias('title')) \
                    .orderBy('genre')

DataFrame[genre: string, title: string]

In [None]:
# use the dataframe dfmovies2 produced to count movie in a single genre
genre_movies = dfmovies2 \
                    .groupBy(dfmovies2.genre) \
                    .sum(dfmovies2.title) \
                    .alias('count')) \
                    .orderBy('genre')

In [142]:
genre_movies.show()

+------------------+--------------------+
|             genre|               title|
+------------------+--------------------+
|(no genres listed)|La cravate (1957)...|
|            Action|Heat (1995),Sudde...|
|         Adventure|Toy Story (1995),...|
|         Animation|Toy Story (1995),...|
|          Children|Toy Story (1995),...|
|            Comedy|Toy Story (1995),...|
|             Crime|Heat (1995),Casin...|
|       Documentary|Nico Icon (1995),...|
|             Drama|Waiting to Exhale...|
|           Fantasy|Toy Story (1995),...|
|         Film-Noir|Devil in a Blue D...|
|            Horror|Dracula: Dead and...|
|              IMAX|Apollo 13 (1995),...|
|           Musical|Pocahontas (1995)...|
|           Mystery|Copycat (1995),Ci...|
|           Romance|Grumpier Old Men ...|
|            Sci-Fi|Powder (1995),Cit...|
|          Thriller|Heat (1995),Golde...|
|               War|Richard III (1995...|
|           Western|Desperado (1995),...|
+------------------+--------------

In [145]:
# pick distinct genre
genres_dummies = dfmovies2.filter(dfmovies2.genre != '(no genres listed)') \
                     .select(dfmovies2.genre).distinct() \
                     .orderBy(dfmovies2.genre)

DataFrame[genre: string]

In [146]:
genres_dummies.show()

+-----------+
|      genre|
+-----------+
|     Action|
|  Adventure|
|  Animation|
|   Children|
|     Comedy|
|      Crime|
|Documentary|
|      Drama|
|    Fantasy|
|  Film-Noir|
|     Horror|
|       IMAX|
|    Musical|
|    Mystery|
|    Romance|
|     Sci-Fi|
|   Thriller|
|        War|
|    Western|
+-----------+



### Data Wrangling with Spark SQL and OLAP

In [None]:
dfratings.createOrReplaceTempView("ratings")     #userid, movieid, rating, timestamp
dfmovies.createOrReplaceTempView("movies")       #movieid, title, genres
dflinks.createOrReplaceTempView("links")         #movieid, imdbId, tmdbId
dftags.createOrReplaceTempView("tags")           #userid, movieid, tag, timestamp
dfawards.createOrReplaceTempView("awards")       #title, year, awards, nominations

In [None]:
# Split title and release year in separate columns in movies dataframe. Convert year to timestamp.       
movies = spark.sql("select movieId, substr(title, 0, length(title)-7) as title, substr(title, -5, 4) as year from movies")
movies.show()
movies.createOrReplaceTempView("movies") 

In [None]:
# number of movies not rated
spark.sql("""select 
          count(distinct movies.movieId)
          from movies 
          where movies.movieId not in
          (select distinct ratings.movieId from ratings)
          """).show()

In [None]:
# the top 5 movies with high ratings
spark.sql("""select distinct
    m.title as title,
    avg(r.rating) as avg_rating
    from movies as m join ratings as r on m.movieId = r.movieId
    group by m.title
    order by avg_rating desc
    limit 5
""").show()

In [None]:
# the most awards a movie got
spark.sql("""select distinct
                    title,
                    sum(awards) as tot_awards
                    from awards
                    group by title
                    order by tot_awards desc
                    limit 1
""").show()

In [None]:
# the rating score of movie with highest awards

In [None]:
# year of awards in the dataset
spark.sql("""select 
             min(year) as min_year,
             max(year) as max_year
             from awards
""").show()

# STEP 3: Connect to the Redshift Cluster

### Part 1: Extract data and transform into fact and dimension tables

In [None]:
%%sql
DROP TABLE IF EXISTS dimDate
CREATE TABLE dimDate
(
  date_key timestamp NOT NULL PRIMARY KEY,
  year smallint NOT NULL,
  month smallint NOT NULL,
  day smallint NOT NULL,
  week smallint NOT NULL,
  weekday varchar(3) NOT NULL
);

DROP TABLE IF EXISTS dimRating
CREATE TABLE dimRating
(
  userId             smallint NOT NULL PRIMARY KEY,
  movieId            smallint NOT NULL,
  rating             numeric NOT NULL,
  rate_time          timestamp REFERENCES dimdate (date_key)
);

DROP TABLE IF EXISTS dimGenres
CREATE TABLE dimGenres
(
  genreId            smallint NOT NULL PRIMARY KEY
  genres             text NOT NULL,
  title              varchar(45) NOT NULL
);

DROP TABLE IF EXISTS factMovies
CREATE TABLE factMovies
(
  MovieId      smallint NOT NULL PRIMARY KEY,
  title        varchar(45) NOT NULL,
  release_year year NOT NULL,
  awards       smallint NOT NULL,
  Nominations  smallint NOT NULL
)



In [None]:
%%sql
INSERT INTO dimDate (date_key, date, year, month, day, week, dow)
SELECT DISTINCT(TO_CHAR(timestamp :: DATE, 'yyyyMMDD')::integer) AS date_key,
       date(timestamp)                                           AS date,
       EXTRACT(year FROM timestamp)                              AS year,
       EXTRACT(month FROM timestamp)                             AS month,
       EXTRACT(day FROM timestamp)                               AS day,
       EXTRACT(week FROM timestamp)                              AS week,
       dayofweek(timestamp)                                      AS dow
FROM ratings;

In [None]:
%%sql
INSERT INTO factMovies (movieId, title, release_year, awards, nominations)
SELECT DISTINCT(movieId)                                         AS movieId,
       title                                                     AS title,
       year                                                      AS release_year,
       awards                                                    AS awards,
       nominations                                               AS nominations
FROM movies as m LEFT JOIN awards as a on upper(trim(m.title)) = upper(trim(a.title));

In [None]:
%%sql
INSERT INTO dimRatings (userId, movieId, rating, rate_time)
SELECT * FROM ratings

In [None]:
%%sql
INSERT INTO dimGenres (genreId, genre, title)


In [None]:
song_field = ["title", "duration", "year", "artist_id"]
songs_table = df.select(song_field).dropDuplicates().withColumn("song_id", F.monotonically_increasing_id()).filter(~col("year").isin([0]) & col("year").isNotNull() & col("artist_id").isNotNull())

In [None]:
artist_field = ["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]
artists_table = df.select(artist_field).dropDuplicates().dropna(subset=["artist_id","artist_name"])

In [None]:
songs_table.write.partitionBy("year", "artist_id").parquet("s3a://sparkifytest/songs/", mode="overwrite")

In [None]:
artists_table.write.parquet("s3a://sparkifytest/artists/", mode="overwrite")

In [None]:
user_field = [" userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
users_table = df.selectExpr(user_field).dropDuplicates().dropna(how = "any")

In [None]:
users_table.write.parquet("s3a://sparkifytest/users/", mode="overwrite")

In [None]:
time_table.write.partitionBy("year", "month").parquet("s3a://sparkifytest/time/", mode="overwrite")

In [None]:
song_df = spark.read.parquet("s3a://sparkifytest/songs/*/*/*")
artist_df = spark.read.parquet("s3a://sparkifytest/artists/*")

In [None]:
Join_song = df.join(song_df, ((song_df.title == df.song) & (song_df.duration == df.length)))
artists_songs_logs = Join_song.join(artist_df, (Join_song.artist == artist_df.artist_name))
songplays = artists_songs_logs.join(time_table, (artists_songs_logs.start_time == time_table.start_time), 'left').drop(artists_songs_logs.start_time)

In [None]:
songplays_field = ["start_time", "userId as user_id", "level", "song_id", "artist_id", "sessionid as session_id", "artist_location as location", "userAgent as user_agent", "year", "month"]

In [None]:
songplays_table = songplays.selectExpr(songplays_field).dropDuplicates().dropna(subset=["user_id", "artist_id", \
"song_id"]).withColumn("songplay_id", F.monotonically_increasing_id())

In [None]:
songplays_table.write.partitionBy("year", "month").parquet(output_date + "songplays/", mode="overwrite")
 

In [None]:
songplays_table.createOrReplaceTempView("songplays")

spark.sql("""
    SELECT month, count(song_id) as song_num
    FROM songplays
    GROUP by month
    order by song_num desc
""").show()