# Lab 3

## Imports

In [None]:
import pyspark
from pyspark.sql.functions import unix_timestamp, current_timestamp, col, lit, split, datediff, floor, current_date, to_date, when, mean, expr, udf
from pyspark.sql.types import LongType
import numpy as np

## Setup spark session

In [None]:
spark = pyspark.sql.SparkSession.builder \
    .appName('Lab3') \
    .config('spark.sql.legacy.timeParserPolicy', 'LEGACY') \
    .getOrCreate()

Spark server is accessible here (click on Spark UI):

In [None]:
spark

In [None]:
import os
os.getenv('PYSPARK_PYTHON')

## Define useful functions

In [None]:
# Display head of spark dataframe in a way that looks nice in notebook
def display_head(df: pyspark.sql.DataFrame, n: int):
    return df.limit(n).toPandas().head(n)

## Part 1

### Names.csv
- Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
- Dodaj kolumnę w której wyliczysz wzrost w stopach (feet)
- Odpowiedz na pytanie jakie jest najpopularniesze imię?
- Dodaj kolumnę i policz wiek aktorów
- Usuń kolumny (bio, death_details)
- Zmień nazwy kolumn - dodaj kapitalizaję i usuń _
- Posortuj dataframe po i imeniu rosnąco



In [None]:
names_df = spark.read.csv('../data/names.csv', header=True, inferSchema=True)
display_head(names_df, 10)

In [None]:
res_df = names_df \
    .withColumn("execution_time", unix_timestamp(current_timestamp())) \
    .withColumn("height_feet", col('height') * lit(3.28)) \

display_head(res_df, 20)

In [None]:
most_popular_names = names_df \
    .withColumn("first_name", split(col('name'), ' ').getItem(0)) \
    .groupBy('first_name').count() \
    .sort('count', ascending=False)

most_popular_names.explain()

In [None]:
"most_popular_name = most_popular_names.first()
print(f'Most popular name is {most_popular_name["first_name"]} with {most_popular_name["count"]} occurances')

In [None]:
res_df = res_df \
    .withColumn('birth_date1', to_date(col('date_of_birth'), 'yyyy-MM-dd')) \
    .withColumn('birth_date2', to_date(col('date_of_birth'), 'dd.MM.yyyy')) \
    .withColumn('date_of_birth', when(col('birth_date1').isNotNull(), col('birth_date1')).otherwise(col('birth_date2'))) \
    .drop('birth_date1', 'birth_date2') \
    .withColumn('is_dead', when(col('date_of_death').isNotNull() | col('place_of_death').isNotNull() | col('reason_of_death').isNotNull() | col('death_details').isNotNull(), 1).otherwise(0)) \
    .withColumn('death_date1', to_date(col('date_of_death'), 'yyyy-MM-dd')) \
    .withColumn('death_date2', to_date(col('date_of_death'), 'dd.MM.yyyy')) \
    .withColumn('date_of_death', when(col('death_date1').isNotNull(), col('death_date1')).otherwise(col('death_date2'))) \
    .drop('death_date1', 'death_date2') \
    .withColumn('date_of_death', to_date(col('date_of_death'), 'dd.MM.yyyy')) \
    .withColumn('age', when(col('is_dead') == 1, floor(datediff(col('date_of_death'), col('date_of_birth')) / 365.25))
               .otherwise(floor(datediff(current_date(), col('date_of_birth')) / 365.25)) )

res_df.explain()

In [None]:
def to_camel_case(s: str):
    return ''.join(word.capitalize() for word in s.split('_'))

res_df = res_df.drop('bio', 'death_details')

for col_name in res_df.columns:
    res_df = res_df.withColumnRenamed(col_name, to_camel_case(col_name))

res_df = res_df.sort('Name', ascending=True)

display_head(res_df, 50)

## Part 2

### Movies.csv

* Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
* Dodaj kolumnę która wylicza ile lat upłynęło od publikacji filmu
* Dodaj kolumnę która pokaże budżet filmu jako wartość numeryczną, (trzeba usunac znaki walut)
* Usuń wiersze z dataframe gdzie wartości są null

In [None]:
movies_df = spark.read.csv('../data/movies.csv', header=True, inferSchema=True)

display_head(movies_df, 50)

In [None]:
res_df = movies_df \
    .withColumn("execution_time", unix_timestamp(current_timestamp())) \
    .withColumn('pub_date1', to_date(col('date_published'), 'yyyy-MM-dd')) \
    .withColumn('pub_date2', to_date(col('date_published'), 'dd.MM.yyyy')) \
    .withColumn('pub_date3', to_date(col('date_published'), 'yyyy')) \
    .withColumn('date_published', when(col('pub_date1').isNotNull(), col('pub_date1')).otherwise(col('pub_date2'))) \
    .withColumn('date_published', when(col('date_published').isNotNull(), col('date_published')).otherwise(col('pub_date3'))) \
    .drop('pub_date1', 'pub_date2', 'pub_date3') \
    .withColumn('years_from_published', floor(datediff(current_date(), col('date_published')) / 365.25)) \
    .withColumn('budget_numeric', split(col('budget'), ' ').getItem(1)) \
    .dropna()

display_head(res_df, 50)

## Part 3

### Ratings.csv

* Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
* Dla każdego z poniższych wyliczeń nie bierz pod uwagę `nulls`
* Dodaj nowe kolumny i policz mean i median dla wartości głosów (1 d 10)
* Dla każdej wartości mean i median policz jaka jest różnica między weighted_average_vote
* Kto daje lepsze oceny chłopcy czy dziewczyny dla całego setu
* Dla jednej z kolumn zmień typ danych do `long`

In [None]:
ratings_df = spark.read.csv('../data/ratings.csv', header=True, inferSchema=True)

display_head(ratings_df, 10)

In [None]:
res_df = ratings_df \
    .withColumn("execution_time", unix_timestamp(current_timestamp())) \
    .dropna()

In [None]:
from pyspark.sql.types import DoubleType

col_names = [f'votes_{i+1}' for i in range(10)]
mean_expr = expr(f'({" + ".join(col_names)}) / 10')

def median(*values):
    return float(np.median(values))

median_udf=udf(median, DoubleType())
res_df = res_df \
    .withColumn('votes1-10_mean', mean_expr) \
    .withColumn('votes1-10median', median_udf(*col_names)) \
    .withColumn('diff_wa_mean1-10', col('weighted_average_vote') - col('votes1-10_mean')) \
    .withColumn('diff_wa_median1-10', col('weighted_average_vote') - col('votes1-10median'))

display_head(res_df, 50)

In [None]:
female_votes_larger = ratings_df.filter(col('females_allages_avg_vote') > col('males_allages_avg_vote')).count()

male_votes_larger = ratings_df.filter(col('females_allages_avg_vote') < col('males_allages_avg_vote')).count()

votes_equal = ratings_df.filter(col('females_allages_avg_vote') == col('males_allages_avg_vote')).count()

mean_votes = ratings_df.agg(mean(col("females_allages_avg_vote")).alias("female_avg_vote"), mean(col("males_allages_avg_vote")).alias("male_avg_vote")).collect()

In [None]:
female_avg_vote = mean_votes[0]['female_avg_vote']
male_avg_vote = mean_votes[0]['male_avg_vote']

In [None]:
print(f'Number of male larger average votes: {male_votes_larger}')
print(f'Number of female larger average votes: {female_votes_larger}')
print(f'Number of equal average votes: {votes_equal}')
print(f'Male average vote: {male_avg_vote}')
print(f'Female average vote: {female_avg_vote}')

Therefore we can conclude that female's votes are more favorable that male one's

In [None]:
ratings_df.withColumn('total_votes', col('total_votes').cast(LongType()))

## Stop spark

In [None]:
spark.stop()

### Spark UI
- In jobs section we can see state of executors and jobs suceeded/failed/running
- In stages we can see stages of execution of each job
- In Storage section we can see RDD and cache's
- In Environment we can see configuration of spark environment
- In executors we get overview of attached executors
- In SQL/DaataFrame we can see executed queries with their times