# Projekt Apache Spark

# Wprowadzenie

Wykorzystując ten notatnik jako szablon zrealizuj projekt Apache Spark zgodnie z przydzielonym zestawem. 

Kilka uwag:

* Nie modyfikuj ani nie usuwaj paragrafów *markdown* w tym notatniku, chyba że wynika to jednoznacznie z instrukcji. 
* Istniejące paragrafy zawierające *kod* uzupełnij w razie potrzeby zgodnie z instrukcjami
    - nie usuwaj ich
    - nie usuwaj zawartych w nich instrukcji oraz kodu
    - nie modyfikuj ich, jeśli instrukcje jawnie tego nie nakazują
* Możesz dodawać nowe paragrafy zarówno zawierające kod jak i komentarze dotyczące tego kodu (markdown)

# Treść projektu

Poniżej w paragrafie markdown wstaw tytuł przydzielonego zestawu

# Zestaw 4 – imdb-persons

# Działania wstępne 

Utworzenie kontekstów

In [None]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.getOrCreate()

In [None]:
sc = spark.sparkContext

W poniższym paragrafie uzupełnij polecenia definiujące poszczególne zmienne. 

Pamiętaj abyś:

* w późniejszym kodzie, dla wszystkich cześci projektu, korzystał z tych zdefiniowanych zmiennych. Wykorzystuj je analogicznie jak parametry
* przed ostateczną rejestracją projektu usunął ich wartości, tak aby nie pozostawiać w notatniku niczego co mogłoby identyfikować Ciebie jako jego autora

In [None]:
# pełna ścieżka do katalogu w zasobniku zawierającego podkatalogi `datasource1` i `datasource4` 
# z danymi źródłowymi
input_dir = 

Nie modyfikuj poniższych paragrafów. Wykonaj je i używaj zdefniowanych poniżej zmiennych jak parametrów Twojego programu.

In [None]:
# NIE ZMIENIAĆ
# ścieżki dla danych źródłowych 
datasource1_dir = input_dir + "/datasource1"
datasource4_dir = input_dir + "/datasource4"

# nazwy i ścieżki dla wyników dla misji głównej 
# część 1 (Spark Core - RDD) 
rdd_result_dir = "/tmp/output1"

# część 2 (Spark SQL - DataFrame)
df_result_table = "output2"

In [None]:
# NIE ZMIENIAĆ
import os
def remove_file(file):
    if os.path.exists(file):
        os.remove(file)

remove_file("metric_functions.py")
remove_file("tools_functions.py")

In [None]:
# NIE ZMIENIAĆ
import requests
r = requests.get("https://jankiewicz.pl/bigdata/metric_functions.py", allow_redirects=True)
open('metric_functions.py', 'wb').write(r.content)
r = requests.get("https://jankiewicz.pl/bigdata/tools_functions.py", allow_redirects=True)
open('tools_functions.py', 'wb').write(r.content)

In [None]:
# NIE ZMIENIAĆ
%run metric_functions.py
%run tools_functions.py

Poniższe paragrafy mają na celu usunąć ewentualne pozostałości poprzednich uruchomień tego lub innych notatników

In [None]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 1 (Spark Core - RDD) 
delete_dir(spark, rdd_result_dir)

In [None]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 2 (Spark SQL - DataFrame) 
drop_table(spark, df_result_table)

***Uwaga!***

Uruchom poniższy paragraf i sprawdź czy adres pod którym dostępny Apache Spark Application UI jest poprawny wywołując następny testowy paragraf. 

W razie potrzeby określ samodzielnie poprawny adres pod którym dostępny Apache Spark Application UI

In [None]:
# adres URL, pod którym dostępny Apache Spark Application UI (REST API)
# 
spark_ui_address = extract_host_and_port(spark, "http://localhost:4040")
spark_ui_address

In [None]:
# testowy paragraf
test_metrics = get_current_metrics(spark_ui_address)

In [None]:
## DODANE PRZEZ AUTORA
header_result = ['profession', 'primaryName', 'movies']

# Część 1 - Spark Core (RDD)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [None]:
# NIE ZMIENIAĆ
before_rdd_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź rozwiązanie *misji głównej* oparte na *RDD API*. 

Pamiętaj o wydajności Twojego przetwarzania, *RDD API* tego wymaga. 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

### Przygotowanie danych wyjściowych

In [None]:
ds1 = sc.textFile(f"{datasource1_dir}/*")

In [None]:
movies_data_rdd = ds1.map(lambda line: line.split("\t"))

In [None]:
ds4 = sc.textFile(f"{datasource4_dir}/*")

In [None]:
header = ds4.first()
persons_data_rdd = ds4.filter(lambda line: line != header).map(lambda line: line.split("\t"))

### Wyłuskanie pożądanych kolumn

In [None]:
movies_data_rdd = movies_data_rdd.map(lambda x: [x[0], x[2], x[3]])

In [None]:
persons_data_rdd = persons_data_rdd.map(lambda x: [x[0], x[1], x[4]])

### Wyznaczanie 4 najbardziej popularnych profesji

In [None]:
def map_person_professions(person):
    professions = person[-1]
    mapped_professions = []
    
    for profession in professions.split(','):
        if profession not in ('', 'miscellaneous'):
            mapped_professions.append((profession, 1))
    return mapped_professions

In [None]:
professions_sorted_by_popularity = persons_data_rdd.flatMap(map_person_professions).reduceByKey(lambda x, y: x + y)\
                                                                                    .sortBy(lambda entry: entry[1], False)

In [None]:
top4_popular_professions = tuple(key for key, _ in professions_sorted_by_popularity.take(4))
top4_popular_professions

### Filtrowanie wpisów filmów z pełną obsadą

In [None]:
movies_grouped = movies_data_rdd.map(lambda x: [x[0], x[1:]]).groupByKey().mapValues(list)

Funkcja do filtrowania filmów, dla każdego sprawdza czy ma 1.aktora/aktorkę 2.reżysera 3.dwie inne role

In [None]:
import logging as log

def is_fully_casted_rdd(x):
    movie_entries = list(x[1])
    has_actor, has_director, has_other1, has_other2 = False, False, False, False
    other_role = ''
    
    for entry in movie_entries:
        if has_actor and has_director and has_other1 and has_other2:
            return True;
           
        role = entry[1]

        if role == '':
            log.error("Invalid role!")
        
        if role in ('actor', 'actress','self'):
            has_actor = True
            continue
        elif role == 'director':
            has_director = True
            continue
        elif not has_other1:
            has_other1 = True
            other_role = role
            continue
        elif not has_other2 and role not in ('actor', 'actress', other_role):
            has_other2 = True
            continue
    return has_actor and has_director and has_other1 and has_other2

In [None]:
full_cast_movies_grouped = movies_grouped.filter(is_fully_casted_rdd)

In [None]:
full_cast_movies = full_cast_movies_grouped.flatMap(lambda x: [(value[0], value[1], x[0]) for value in x[1]])

### Filtrowanie wpisów filmów dla 4 najpopularniejszych profesji

In [None]:
full_cast_movies_of_top4_roles =  full_cast_movies.filter(lambda x: x[-2] in top4_popular_professions)

### Wyznaczanie 3 najbardziej zaangażowanych osób dla 4 najpopularniejszych profesji

In [None]:
persons_involvement = full_cast_movies_of_top4_roles.distinct().map(lambda x: [tuple(x[:-1]), 1]).reduceByKey(lambda x, y: x + y)\
                                                                                                .map(lambda x: [*x[0],x[1]]) #rozpakowanie klucza

In [None]:
top_3_persons_of_top4_roles_grouped = persons_involvement.groupBy(lambda x: x[1]) \
                                        .mapValues(lambda role_rows: sorted(role_rows, key= lambda x: x[2], reverse=True)[:3])

In [None]:
top_3_persons_of_top4_roles = top_3_persons_of_top4_roles_grouped.flatMap(lambda x: [(person[0], (x[0], person[2])) for person in x[1]])

### Odczytanie nazwisk najbardziej zaangażowanych i formatowanie wyników

In [None]:
person_id_name = persons_data_rdd.map(lambda x: x[:-1])

In [None]:
result = sc.parallelize([header_result]) + top_3_persons_of_top4_roles.leftOuterJoin(person_id_name)\
                                                    .map(lambda x: [x[1][0][0], x[1][1], x[1][0][1]])\
                                                    .sortBy(lambda x: (x[0],-x[-1]))

In [None]:
result.collect()

### Zapisanie wyniku

In [None]:
result.saveAsPickleFile(rdd_result_dir)

### Metryki

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [None]:
# NIE ZMIENIAĆ
after_rdd_metrics = get_current_metrics(spark_ui_address)

# Część 2 - Spark SQL (DataFrame)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [None]:
# NIE ZMIENIAĆ
before_df_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź rozwiązanie *misji głównej* swojego projektu oparte o *DataFrame API*. 

Pamiętaj o wydajności Twojego przetwarzania, *DataFrame API* nie jest w stanie wszystkiego "naprawić". 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

### Załadowanie danych i wyłuskanie pożądanych kolumn

In [None]:
from pyspark.sql.functions import *
from pyspark.sql import Window

#### Flmy

In [None]:
ds1_df = spark.read \
    .option("header", "false") \
    .option("sep", "\t") \
    .option("inferSchema", "true") \
    .csv(datasource1_dir)

In [None]:
movies_df = ds1_df.select(col(ds1_df.columns[0]).alias('movieID'), col(ds1_df.columns[2]).alias('personID'), \
                          col(ds1_df.columns[3]).alias('role'))

#### Osoby

In [None]:
ds4_df = spark.read \
    .option("header", "True") \
    .option("sep", "\t") \
    .option("inferSchema", "true") \
    .csv(datasource4_dir)

In [None]:
persons_df = ds4_df.select(expr("nconst as personID"), expr("primaryName as name"), expr('primaryProfession as professions'))

### Wyznaczanie 4 najbardziej popularnych profesji

In [None]:
top4_popular_professions = persons_df.withColumn('profession', explode(split(col('professions'), ","))) \
                        .drop('professions') \
                        .filter(col('profession') != 'miscellaneous') \
                        .groupBy('profession').agg(count('*').alias('count')) \
                        .sort(col('count').desc()).select('profession').limit(4)
top4_professions_list = [row['profession'] for row in top4_popular_professions.collect()]

### Filtrowanie wpisów filmów z pełną obsadą

In [None]:
basic_roles = ( 'director', 'actor', 'actress', 'self' )

def is_df_fully_casted(roles):
        return array_contains(roles, basic_roles[0]) & \
            (array_contains(roles, basic_roles[1]) | array_contains(roles, basic_roles[2]) | array_contains(roles, basic_roles[3])) & \
            (size(array_except(roles,  array( [lit(x) for x in basic_roles] ) )) >= 2)

In [None]:
full_cast_movieIDs = movies_df.groupBy('movieID').agg(collect_set('role').alias('roles'))\
                                .filter(is_df_fully_casted(col('roles'))).drop('roles')

In [None]:
full_cast_movies = movies_df.join(full_cast_movieIDs, on="movieID", how="inner")

### Filtrowanie wpisów filmów dla 4 najpopularniejszych profesji

In [None]:
top4_prof_full_cast_movies = full_cast_movies.filter(col('role').alias('profession').isin( top4_professions_list ))

### Wyznaczanie 3 najbardziej zaangażowanych osób dla 4 najpopularniejszych profesji

In [None]:
movies_per_role_n_person = top4_prof_full_cast_movies.groupBy('role', 'personID').agg(count('*').alias('movies'))

In [None]:
role_window_spec = Window.partitionBy("role").orderBy(col("movies").desc())

ranked_df = movies_per_role_n_person.withColumn("rank", row_number().over(role_window_spec))

top3_persons_of_top4_roles = ranked_df.filter(col("rank") <= 3).drop("rank")

### Odczytanie nazwisk najbardziej zaangażowanych i formatowanie wyników

In [None]:
names_per_IDs = persons_df.drop('professions')

In [None]:
result = top3_persons_of_top4_roles.join(names_per_IDs, on='personID', how='inner').drop('personID').sort(col('role'), col('movies').desc())

In [None]:
result = result.select(col('role').alias(header_result[0]), col('name').alias(header_result[1]), col('movies'))
result.show(truncate=False)

In [None]:
result.write.saveAsTable(df_result_table)

### Metryki

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [None]:
# NIE ZMIENIAĆ
after_df_metrics = get_current_metrics(spark_ui_address)

# Analiza wyników i wydajności *misji głównych*

## Część 1 - Spark Core (RDD)

In [None]:
# Wczytanie wyników z pliku pickle
word_counts = sc.pickleFile(rdd_result_dir)

# Wyświetlenie 50 pierwszych elementów
result_sample = word_counts.take(50)
for item in result_sample:
    print(item)

In [None]:
subtract_metrics(after_rdd_metrics, before_rdd_metrics)

## Część 2 - Spark SQL (DataFrame)

In [None]:
df = spark.table(df_result_table)

# Wyświetlenie 50 pierwszych rekordów
df.show(50)

In [None]:
subtract_metrics(after_df_metrics, before_df_metrics)