# DSCI 417 – Homework 04
**Lauren Forti**

In [0]:
# setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.getOrCreate()

## Load Diamond Data

In [0]:
# create schema
my_schema = 'carat DOUBLE, cut STRING, color STRING, clarity STRING, depth DOUBLE, table DOUBLE, price INTEGER, x DOUBLE, y DOUBLE, z DOUBLE'

# read file into df
diamonds = (
    spark.read
    .option('delimiter', '\t')
    .option('header', True)
    .schema(my_schema)
    .csv('/FileStore/tables/diamonds.txt')
)

# display schema
diamonds.printSchema()

## Part 1:  Grouping By Cut

In [0]:
# function to convert str to numeric
def rank_cut(level):
  ranks = {
    'Fair':1,
    'Good':2,
    'Very Good':3,
    'Premium':4,
    'Ideal':5
  }
  # return appropriate level
  return ranks.get(level)

# register as Spark UDF
spark.udf.register('rank_cut', rank_cut)

In [0]:
(
  diamonds
  
  # group by cut
  .groupBy('cut')
  
  .agg(
    # count diaamonds
    expr('count(*) as n_diamonds'),
    # calc avg price
    expr('int(round(avg(price),0)) as avg_price'),
    # calc avg carat
    expr('round(avg(carat), 2) as avg_carat'),
    # calc avg depth
    expr('round(avg(depth), 2) as avg_depth'),
    # calc avg table
    expr('round(avg(table), 2) as avg_table'),
  )

  # convert cut column to numeric
  .withColumn('cut', expr('rank_cut(cut)'))
  
  # sort by cut
  .sort('cut')

).show()

## Problem 2: Filtering based on Carat Size

In [0]:
# get count of diamonds within ranges
for item in range(0,6):
  # get bounds
  lower = item
  upper = item + 1
  
  # count diamonds within bounds
  ct = diamonds.filter((col('carat') >= lower) & (col('carat') < upper)).count()
  
  # output results
  print(f'The number of diamonds with carat size in range [{lower}, {upper}) is {ct}.')

## Problem 3: Binning by Carat Size

In [0]:
# function to find which bin diamonds fall into
def carat_bin(size):
  bins = ['[0, 1)', '[1, 2)', '[2, 3)', '[3, 4)', '[4, 5)', '[5, 6)']
  return bins[int(size)]

# register as UDF
spark.udf.register('carat_bin', carat_bin)

In [0]:
(
  diamonds
  
  # add new calcd column
  .select(
    '*',
    expr('carat_bin(carat) as carat_bin')
  )
  
  # group by carat_bin
  .groupBy('carat_bin')
  
  .agg(
    # get count of diamonds
    expr('count(*) as n_diamonds'),
    # calc avg price
    expr('int(round(avg(price),0)) as avg_price')
  )
  
  # sort by carat_bin asc
  .sort('carat_bin')

).show()

## Load IMDB Data

In [0]:
# read file into df
movies = (
    spark.read
    .option('delimiter', '\t')
    .option('header', True)
    .csv('/FileStore/tables/imdb/movies.txt')
)

# display schema
movies.printSchema()

In [0]:
# read file into df
names = (
    spark.read
    .option('delimiter', '\t')
    .option('header', True)
    .csv('/FileStore/tables/imdb/names.txt')
)

# display schema
names.printSchema()

In [0]:
# read file into df
title_principals = (
    spark.read
    .option('delimiter', '\t')
    .option('header', True)
    .csv('/FileStore/tables/imdb/title_principals.txt')
)

# display schema
title_principals.printSchema()

In [0]:
# read file into df
ratings = (
    spark.read
    .option('delimiter', '\t')
    .option('header', True)
    .csv('/FileStore/tables/imdb/ratings.txt')
)

# display schema
ratings.printSchema()

In [0]:
# output # of records in each of the four dfs
print('   Number of Records')
print('-'*24)
print('movies           ', movies.count())
print('names            ', names.count())
print('title_principals ', title_principals.count())
print('ratings          ', ratings.count())

## Problem 4: Number of Appearances by Actor

In [0]:
(
  title_principals
  
  # get only actors/actresses
  .filter((col('category') == 'actor') | (col('category') == 'actress'))
  
  # group by imdb ID for each person
  .groupBy('imdb_name_id')
  
  # get # of movies each person has appeared in
  .agg(
    expr('count(imdb_name_id) as appearances')
   )
  
  # combine with names df
  .join(other=names, on='imdb_name_id', how='left')
  
  # select only name and appearances cols
  .select('name', 'appearances')
  
  # sort desc by # of appearances
  .sort('appearances', ascending = False)
  
  # display the first 16 rows
  .show(16)
)

## Problem 5: Average Rating by Director

In [0]:
(
  title_principals
  
  # get only actors/actresses
  .filter(col('category') == 'director')
  
  # bring in ratings info
  .join(other=ratings, on='imdb_title_id', how='outer')
  
  # group by imdb ID for each person
  .groupBy('imdb_name_id')
  
  .agg(
    # get # of movies each person has directed
    expr('count(imdb_name_id) as num_films'),
    # get total votes for each director
    expr('int(sum(total_votes)) as total_votes'),
    # get avg rating for each director
    expr('round(avg(rating),2) as avg_rating')
  )
  
  # keep only directors whose movies have @ least 1 million votes  
  .filter(
    expr('total_votes >= 1000000')
  )

  # keep only directors who have directed at least 5 movies
  .filter(
    expr('num_films >= 5')
  )
  
  # bring in names df
  .join(other=names, on='imdb_name_id', how='left')
  
  # select cols
  .select('name', 'num_films', 'total_votes', 'avg_rating')
  
  # sort desc by # of appearances
  .sort('avg_rating', ascending = False)
  
  # display the first 16 rows
  .show(16, truncate=False)
)

## Problem 6: Actors Appearing in Horror Films

In [0]:
# filter df for horror movies
horror_films = movies.filter(expr('genre LIKE "%Horror%"'))

# display # of records
print(horror_films.count())

In [0]:
(
  title_principals
  
  # get only actors/actresses
  .filter((col('category') == 'actor') | (col('category') == 'actress'))
  
  # filter join with horror_films df
  .join(other=horror_films, on='imdb_title_id', how='semi')
  
  # group by imdb ID for each person
  .groupBy('imdb_name_id')
  
  .agg(
    # get # of movies each person has been in
    expr('count(imdb_name_id) as num_films'),    
  )

  # bring in names df
  .join(other=names, on='imdb_name_id', how='left')
  
  # select cols
  .select('name', 'num_films')
  
  # sort desc by # of appearances
  .sort('num_films', ascending = False)

  # display the first 16 rows
  .show(16)
)