# Projekt Apache Spark

# Wprowadzenie

Wykorzystując ten notatnik jako szablon zrealizuj projekt Apache Spark zgodnie z przydzielonym zestawem. 

Kilka uwag:

* Nie modyfikuj ani nie usuwaj paragrafów *markdown* w tym notatniku, chyba że wynika to jednoznacznie z instrukcji. 
* Istniejące paragrafy zawierające *kod* uzupełnij w razie potrzeby zgodnie z instrukcjami
    - nie usuwaj ich
    - nie usuwaj zawartych w nich instrukcji oraz kodu
    - nie modyfikuj ich, jeśli instrukcje jawnie tego nie nakazują
* Możesz dodawać nowe paragrafy zarówno zawierające kod jak i komentarze dotyczące tego kodu (markdown)

# Treść projektu

## Zestaw 4 – imdb-persons

# Zestaw 0 – wzorzec

**Uwaga**

- W ramach wzorca nie są spełnione żadne reguły projektu. 
- Brak konsekwencji w wykorzystaniu właściwego API w ramach poszczególnych części
- Zadanie *misji głównej* polega na zliczeniu słówek.  

# Działania wstępne 

Uruchom poniższy paragraf, aby utworzyć obiekty kontekstu Sparka. Jeśli jest taka potrzeba dostosuj te polecenia. Pamiętaj o potrzebnych bibliotekach.

In [None]:
!sudo /opt/conda/miniconda3/bin/mamba install delta-spark==3.0.0

In [1]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Spark session & context
builder = SparkSession.builder \
    .appName("zestaw4") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "5g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "2") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

sc = spark.sparkContext

W poniższym paragrafie uzupełnij polecenia definiujące poszczególne zmienne. 

Pamiętaj abyś:

* w późniejszym kodzie, dla wszystkich cześci projektu, korzystał z tych zdefiniowanych zmiennych. Wykorzystuj je analogicznie jak parametry
* przed ostateczną rejestracją projektu usunął ich wartości, tak aby nie pozostawiać w notatniku niczego co mogłoby identyfikować Ciebie jako jego autora

In [3]:
# pełna ścieżka do katalogu w zasobniku zawierającego podkatalogi `datasource1` i `datasource4` 
# z danymi źródłowymi
input_dir = "/home/jovyan/data"

Nie modyfikuj poniższych paragrafów. Wykonaj je i używaj zdefniowanych poniżej zmiennych jak parametrów Twojego programu.

In [4]:
# NIE ZMIENIAĆ
# ścieżki dla danych źródłowych 
datasource1_dir = input_dir + "/datasource1"
datasource4_dir = input_dir + "/datasource4"

# nazwy i ścieżki dla wyników dla misji głównej 
# część 1 (Spark Core - RDD) 
rdd_result_dir = "/tmp/output1"

# część 2 (Spark SQL - DataFrame)
df_result_table = "output2"

# część 3 (Pandas API on Spark)
ps_result_file = "/tmp/output3.json"

In [4]:
# NIE ZMIENIAĆ
import os
def remove_file(file):
    if os.path.exists(file):
        os.remove(file)

remove_file("metric_functions.py")
remove_file("tools_functions.py")

In [None]:
# NIE ZMIENIAĆ
import requests
r = requests.get("https://jankiewicz.pl/bigdata/metric_functions.py", allow_redirects=True)
open('metric_functions.py', 'wb').write(r.content)
r = requests.get("https://jankiewicz.pl/bigdata/tools_functions.py", allow_redirects=True)
open('tools_functions.py', 'wb').write(r.content)

In [6]:
# NIE ZMIENIAĆ
%run metric_functions.py
%run tools_functions.py

Poniższe paragrafy mają na celu usunąć ewentualne pozostałości poprzednich uruchomień tego lub innych notatników

In [None]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 1 (Spark Core - RDD) 
delete_dir(spark, rdd_result_dir)

In [None]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 2 (Spark SQL - DataFrame) 
drop_table(spark, df_result_table)

In [9]:
# NIE ZMIENIAĆ
# usunięcie miejsca docelowego dla część 3 (Pandas API on Spark) 
remove_file(ps_result_file)

In [5]:
# NIE ZMIENIAĆ
spark

***Uwaga!***

Uruchom poniższy paragraf i sprawdź czy adres, pod którym dostępny *Apache Spark Application UI* jest poprawny wywołując następny testowy paragraf. 

W razie potrzeby określ samodzielnie poprawny adres, pod którym dostępny *Apache Spark Application UI*

In [None]:
# adres URL, pod którym dostępny Apache Spark Application UI (REST API)
# 
spark_ui_address = extract_host_and_port(spark, "http://localhost:4041")
spark_ui_address

In [None]:
# testowy paragraf
test_metrics = get_current_metrics(spark_ui_address)
test_metrics

# Część 1 - Spark Core (RDD)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [13]:
# NIE ZMIENIAĆ
before_rdd_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź **rozwiązanie** *misji głównej* oparte na *RDD API*. 

Pamiętaj o wydajności Twojego przetwarzania, *RDD API* tego wymaga. 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [14]:
# load by textFile
datasource1 = sc.textFile(datasource1_dir).map(lambda x: x.split("\t"))
datasource4 = sc.textFile(datasource4_dir).map(lambda x: x.split("\t"))

# map and normalize to performer
ds1_rdd = datasource1.map(
    lambda x: (
        x[0],
        x[2],
        x[3],
        "performer" if x[3] in {"actor", "actress", "self"} else x[3],
    )
)  # (tconst, nconst, role, role_normalized)

ds4_rdd = datasource4.map(
    lambda x: (x[0], x[1], x[4])
)  # (nconst, primaryName, primaryProfession)

In [15]:
# map-reduce-by-key to create sets of staff per movie, apply filters
grouped_roles = (
    ds1_rdd.map(lambda x: (x[0], {x[3]}))  # Start with a set for roles
    .reduceByKey(lambda roles1, roles2: roles1.union(roles2))  # Merge sets of roles per key
    .filter(lambda x: {"performer", "director"}.issubset(x[1]) and len(x[1]) > 3)  # Apply filters
)
full_cast_movies = grouped_roles.map(lambda x: (x[0], "")) # for join

# get only full-cast roles
full_cast_roles = full_cast_movies.join(ds1_rdd.map(lambda x: (x[0], (x[1], x[2]))))
full_cast_roles_count = full_cast_roles.\
    map(lambda x: ((x[1][1][0], x[1][1][1]), 1)).reduceByKey(lambda a,b: a+b) # ((nconst, profession), movies)

In [None]:
# filter actor profession data
actor_data = ds4_rdd.flatMap(
    lambda row: [
        (profession, 1)
        for profession in row[2].split(",")
        if profession and profession != "miscellaneous"
    ]
)

# calculate the top4 professions, filter for the best 4
top_professions = actor_data.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)
top4_professions = top_professions.zipWithIndex().filter(lambda x: x[1] < 4).map(lambda x: x[0])

In [None]:
full_cast_roles_count_tx = full_cast_roles_count.map(lambda x: (x[0][1], (x[0][0], x[1])))

# movies per person are joined by join and not by filtering full_cast_roles_count_tx with top_professions array to not go beyond the API
movies_per_person = top4_professions.\
    join(full_cast_roles_count_tx).\
    map(lambda x: (x[0], x[1][1][0], x[1][1][1], x[1][0]))

In [None]:
# ranking and sorting, get top3 actors per category 
ranked = movies_per_person.groupBy(lambda x: x[0]).\
    mapValues(lambda rows: sorted(rows, key=lambda x: x[2], reverse=True)[:3]).\
    flatMap(lambda x: x[1])
    
semi_final_result = ranked.map(lambda x: (x[1], (x[0], x[2], x[3]))).\
    join(ds4_rdd.map(lambda x: (x[0], x[1]))).map(lambda x: (x[1][0][0], x[1][1], x[1][0][1], x[1][0][2]))
    
final_result_sorted = semi_final_result.sortBy(lambda x: (-x[3], -x[2])).map(lambda x: (x[0], x[1], x[2]))

In [16]:
# write to pickle
final_result_sorted.saveAsPickleFile(rdd_result_dir)

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [17]:
# NIE ZMIENIAĆ
after_rdd_metrics = get_current_metrics(spark_ui_address)

# Część 2 - Spark SQL (DataFrame)

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [18]:
# NIE ZMIENIAĆ
before_df_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź **rozwiązanie** *misji głównej* swojego projektu oparte o *DataFrame API*. 

Pamiętaj o wydajności Twojego przetwarzania, *DataFrame API* nie jest w stanie wszystkiego "naprawić". 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [19]:
from pyspark.sql.functions import col, explode, split, count, desc
# loading data
datasource1 = spark.read.option("sep", "\t").csv(datasource1_dir, inferSchema=True)
datasource4 = spark.read.option("sep", "\t").csv(datasource4_dir, header=True, inferSchema=True)
datasource1 = datasource1.toDF("tconst", "ordering", "nconst", "role", "job", "characters")
datasource4 = datasource4.toDF("nconst", "primaryName", "birthYear", "deathYear", "primaryProfession", "knownForTitles")

In [20]:
from pyspark.sql.functions import collect_set, size, array_contains, row_number, when
from pyspark.sql import Window

# normalize roles to performer
normalized_roles = datasource1.withColumn(
    "normalized_role",
    when(col("role").isin("actor", "actress", "self"), "performer").otherwise(col("role"))
)

# aggregate roles to find movies with at least 4 staff members, at least one performer and director
full_cast = normalized_roles.groupBy("tconst").agg(collect_set("normalized_role").alias("roles")).\
    filter(
        array_contains(col("roles"), "performer") & 
        array_contains(col("roles"), "director") & 
        (size(col("roles")) > 3)
    ).select("tconst")



In [None]:
# select only roles from full-cast movies
full_cast_roles = full_cast.join(datasource1, "tconst").select("tconst", "nconst", "role")
full_cast_roles_count = full_cast_roles.groupBy("nconst", "role").agg(count("tconst").alias("movies")).\
    select("nconst", col("role").alias("profession"), "movies")

In [None]:
# filter professions
actor_data = datasource4.withColumn("profession", explode(split(col("primaryProfession"), ","))).\
    filter(col("profession") != "miscellaneous")
# best professions
top_professions = actor_data.groupBy("profession").agg(count("nconst").alias("count")).orderBy(desc("count")).limit(4)

In [None]:
# movies per person are joined by join and not by filtering full_cast_roles_count with top_professions array to not go beyond the API
movies_per_person = full_cast_roles_count.join(datasource4.select("nconst", "primaryName"), "nconst").join(top_professions, "profession")
# partition by profession
window_spec = Window.partitionBy("profession").orderBy(desc("movies"))
ranked_movies_per_person2 = movies_per_person.withColumn(
    "rank", row_number().over(window_spec)
)
final_result = ranked_movies_per_person2.filter(col("rank") <= 3).\
    select("profession", "primaryName", "movies").orderBy("profession", desc("movies"))


In [21]:
# save to delta table
final_result.write.mode("overwrite").format("delta").saveAsTable(df_result_table)

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [22]:
# NIE ZMIENIAĆ
after_df_metrics = get_current_metrics(spark_ui_address)

# Część 3 - Pandas API on Spark

Ta część to wyzwanie. W szczególności dla osób, które nie programują na co dzień w Pythonie, lub które nie nie korzystały do tej pory z Pandas API.  

Powodzenia!

## Misje poboczne

W ponizszych paragrafach wprowadź swoje rozwiązania *misji pobocznych*, o ile **nie** chcesz, aby oceniana była *misja główna*. W przeciwnym przypadku **KONIECZNIE** pozostaw je **puste**.  

## Misja główna 

Poniższy paragraf zapisuje metryki przed uruchomieniem Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [23]:
#NIE ZMIENIAĆ
before_ps_metrics = get_current_metrics(spark_ui_address)

W poniższych paragrafach wprowadź **rozwiązanie** swojego projektu oparte o *Pandas API on Spark*. 

Pamiętaj o wydajności Twojego przetwarzania, *Pandas API on Spark* nie jest w stanie wszystkiego "naprawić". 

Nie wprowadzaj w poniższych paragrafach żadnego kodu, w przypadku wykorzystania *misji pobocznych*.

In [None]:
import pyspark.pandas as ps

# Loading data
datasource1 = ps.read_csv(datasource1_dir, sep="\t", header=None)
datasource1.columns = ["tconst", "ordering", "nconst", "role", "job", "characters"]

datasource4 = ps.read_csv(datasource4_dir, sep="\t", header=0)
datasource4.columns = ["nconst", "primaryName", "birthYear", "deathYear", "primaryProfession", "knownForTitles"]

# Normalize roles to "performer"
normalized_roles = datasource1.assign(
    normalized_role=datasource1["role"].apply(
        lambda x: "performer" if x in ["actor", "actress", "self"] else x
    )
)

# Filter for full-cast movies
def filter_roles(roles):
    return "performer" in roles and "director" in roles and len(roles) > 3

roles_grouped = normalized_roles.groupby("tconst").agg({"normalized_role": "collect_set"}).reset_index()
roles_grouped.columns = ["tconst", "roles"]
full_cast_movies = roles_grouped[roles_grouped["roles"].apply(filter_roles)][["tconst"]]

# Select roles from full-cast movies
full_cast_roles = full_cast_movies.merge(datasource1, on="tconst")[["tconst", "nconst", "role"]]
full_cast_roles_count = full_cast_roles.groupby(["nconst", "role"]).agg({"tconst": "count"}).reset_index()
full_cast_roles_count.columns = ["nconst", "profession", "movies"]

# Filter professions
actor_data = datasource4.assign(
    profession=datasource4["primaryProfession"].apply(lambda x: x.split(",") if isinstance(x, str) else [])
).explode("profession")
actor_data = actor_data[actor_data["profession"] != "miscellaneous"]

# Determine top professions
top_professions = actor_data.groupby("profession").agg({"nconst": "count"}).reset_index()
top_professions.columns = ["profession", "count"]
top_professions = top_professions.sort_values("count", ascending=False).head(4)

# Combine roles with top professions
movies_per_person = (
    full_cast_roles_count
    .merge(datasource4[["nconst", "primaryName"]], on="nconst", how="inner")
    .merge(top_professions[["profession"]], on="profession", how="inner")
)

# get top3s
movies_per_person_top3 = movies_per_person.groupby("profession").apply(
    lambda group: group.sort_values(by=["movies"], ascending=[False]).head(3)
)

persons_dropped = movies_per_person_top3.reset_index(drop=True)
persons_to_json = persons_dropped[["profession", "primaryName", "movies"]]


In [None]:
df = persons_to_json.head(50).to_pandas()

In [28]:
df.to_json(ps_result_file, orient='index')

Poniższy paragraf zapisuje metryki po uruchomieniu Twojego rozwiązania *misji głównej*. 

Nie musisz go uruchamiać podczas implementacji rozwiązania.

In [30]:
#NIE ZMIENIAĆ
after_ps_metrics = get_current_metrics(spark_ui_address)

# Analiza wyników i wydajności *misji głównych*

## Część 1 - Spark Core (RDD)

In [None]:
# Wczytanie wyników z pliku pickle
word_counts = sc.pickleFile(rdd_result_dir)

# Wyświetlenie 50 pierwszych elementów
result_sample = word_counts.take(50)
for item in result_sample:
    print(item)

In [None]:
subtract_metrics(after_rdd_metrics, before_rdd_metrics)

## Część 2 - Spark SQL (DataFrame)

In [None]:
df = spark.table(df_result_table)

# Wyświetlenie 50 pierwszych rekordów
df.show(50)

In [None]:
subtract_metrics(after_df_metrics, before_df_metrics)

## Część 3 - Pandas API on Spark

In [None]:
import json

# Odczytaj zawartość pliku JSON
with open(ps_result_file, 'r') as file:
    json_content = json.load(file)

# Wyświetl zawartość
print(json.dumps(json_content, indent=2))

In [None]:
subtract_metrics(after_ps_metrics, before_ps_metrics)