Names.csv 
* Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
* Dodaj kolumnę w której wyliczysz wzrost w stopach (feet)
* Odpowiedz na pytanie jakie jest najpopularniesze imię?
* Dodaj kolumnę i policz wiek aktorów 
* Usuń kolumny (bio, death_details)
* Zmień nazwy kolumn - dodaj kapitalizaję i usuń _
* Posortuj dataframe po imieniu rosnąco

In [0]:
%run "./3. Pobierz Dane"

In [0]:
filePath = "dbfs:/FileStore/tables/Files/names.csv"
namesDf = spark.read.format("csv") \
              .option("header","true") \
              .option("inferSchema","true") \
              .load(filePath) 

#display(namesDf)

In [0]:
#sprawdzamy plan wykonania 
namesDf.explain()


In [0]:
#Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch

from pyspark.sql.functions import unix_timestamp, current_timestamp
namesDf=namesDf.withColumn("execution_time", unix_timestamp(current_timestamp()))
#display(namesDf)

In [0]:
#Dodaj kolumnę w której wyliczysz wzrost w stopach (feet)
from pyspark.sql.functions import col, round

namesDf=namesDf.withColumn("height_in_feet", round(col("height")/30.48 , 2))
#display(namesDf)

In [0]:
#Odpowiedz na pytanie jakie jest najpopularniesze imię?
from pyspark.sql.functions import count

most_freq_name=namesDf.groupBy("name").agg(count("*").alias("count")) \
  .orderBy(col("count").desc()) \
    .first()
most_freq_name['name']


In [0]:
#Dodaj kolumnę i policz wiek aktorów
#jak data smierci i urodzenia jest dostepna otherwise null 
from pyspark.sql.functions import when, col, datediff, floor

namesDf = namesDf.withColumn(
    "age",
    when(col("date_of_birth").isNotNull() & col("date_of_death").isNotNull(),
         floor(datediff(col("date_of_death"), col("date_of_birth")) / 365))
    .otherwise(None)
)

#display(namesDf)

In [0]:
#Usuń kolumny (bio, death_details)
namesDf = namesDf.drop("bio", "death_details")


In [0]:
#Zmień nazwy kolumn - dodaj kapitalizaję i usuń _

new_columns = [col(c).alias(c.title().replace("_", "")) for c in namesDf.columns]
namesDf = namesDf.select(*new_columns)

#namesDf.columns


In [0]:
#Posortuj dataframe po imieniu rosnąco
namesDf = namesDf.orderBy("Name")
#display(namesDf)
#namesDf.orderBy("Name").explain()

Movies.csv
* Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
* Dodaj kolumnę która wylicza ile lat upłynęło od publikacji filmu
* Dodaj kolumnę która pokaże budżet filmu jako wartość numeryczną, (trzeba usunac znaki walut)
* Usuń wiersze z dataframe gdzie wartości są null

In [0]:
filePath = "dbfs:/FileStore/tables/Files/movies.csv"
moviesDf = spark.read.format("csv") \
              .option("header","true") \
              .option("inferSchema","true") \
              .load(filePath) \

#display(moviesDf)

In [0]:
moviesDf.explain()

In [0]:
#Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
moviesDf = moviesDf.withColumn("execution_time", unix_timestamp(current_date()))
#display(moviesDf)

In [0]:
#Dodaj kolumnę która wylicza ile lat upłynęło od publikacji filmu
from pyspark.sql.functions import to_date, year, current_date, col, when
from pyspark.sql.types import StringType

moviesDf = moviesDf.withColumn(
    "parsed_date",
    when(col("date_published").rlike(r'^\d{4}$'), to_date(col("date_published"), "yyyy"))  # Format yyyy
    .when(col("date_published").rlike(r'^\d{2}\.\d{2}\.\d{4}$'), to_date(col("date_published"), "dd.MM.yyyy"))  # Format dd.MM.yyyy
    .otherwise(to_date(col("date_published"), "yyyy-MM-dd"))  # Inne przypadki np. yyyy-MM-dd
)

# Dodanie kolumny z wyliczeniem lat, które upłynęły od publikacji filmu
moviesDf = moviesDf.withColumn(
    "years_since_published",
    year(current_date()) - year(col("parsed_date"))
)

moviesDf=moviesDf.drop('parsed_date')

#display(moviesDf)

In [0]:
#Dodaj kolumnę która pokaże budżet filmu jako wartość numeryczną, (trzeba usunac znaki walut)

from pyspark.sql.functions import regexp_replace
moviesDf = moviesDf.withColumn(
    "budget_numeric",
    regexp_replace(col("budget"), r'[^\d]', '').cast('int')
)
#display(moviesDf)

In [0]:
#Usuń wiersze z dataframe gdzie wartości są null
moviesDf = moviesDf.dropna()
#display(moviesDf)

ratings.csv
* Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
* Dla każdego z poniższych wyliczeń nie bierz pod uwagę `nulls` 
* Kto daje lepsze oceny chłopcy czy dziewczyny dla całego setu
* Dla jednej z kolumn zmień typ danych do `long` 

In [0]:
filePath = "dbfs:/FileStore/tables/Files/ratings.csv"
ratingsDf = spark.read.format("csv") \
              .option("header","true") \
              .option("inferSchema","true") \
              .load(filePath) \

#display(ratingsDf)

In [0]:
#Dodaj kolumnę z wartością czasu wykonania notatnika w formacie epoch
ratingsDf=ratingsDf.withColumn("execution_time", unix_timestamp(current_timestamp()))
#display(ratingsDf)

In [0]:
#Kto daje lepsze oceny chłopcy czy dziewczyny dla całego setu
from pyspark.sql.functions import mean

ratingsDf_noNull=ratingsDf.dropna(subset=["males_allages_avg_vote", "females_allages_avg_vote"])

avg_male=ratingsDf_noNull.select(mean("males_allages_avg_vote")).collect()[0][0]
avg_female=ratingsDf_noNull.select(mean("females_allages_avg_vote")).collect()[0][0]

if avg_male>avg_female:
  print("Lepsze oceny daja chlopcy\n")
else:
  print("Lepsze oceny daja dziewczyny\n")


In [0]:
#Dla jednej z kolumn zmień typ danych do long
ratingsDf=ratingsDf.withColumn("total_votes_long", col("total_votes").cast("long"))
#display(ratingsDf)


In [0]:
"""
ZADANIE 2
Spark UI

Jobs – Zakładka ta zawiera listę wszystkich uruchomionych zadań, wraz z ich unikalnymi identyfikatorami, opisami, datą wykonania oraz czasem trwania. W tym miejscu można również znaleźć informacje o liczbie etapów, które zakończyły się sukcesem, oraz o liczbie zadań, które zostały wykonane pomyślnie. Dodatkowo dostępny jest Event Timeline, który pokazuje wykonanie zadań w czasie, umożliwiając śledzenie postępu aplikacji.

Stages – W tej sekcji znajdują się szczegółowe informacje dotyczące poszczególnych etapów aplikacji, w tym identyfikatory etapów, ich opisy, czas trwania oraz status sukcesu. Można tu sprawdzić, które etapy zakończyły się pomyślnie, a które napotkały błędy.

Storage – Ta zakładka pokazuje dane dotyczące przechowywania danych w pamięci. Oferuje wgląd w dane strukturalne, takie jak DataFrame, RDD czy inne struktury danych przechowywane w pamięci podręcznej. Można tu również zobaczyć ilość danych przechowywanych w pamięci przez aplikację.

Environment – W tej sekcji znajdują się informacje o środowisku Spark, w tym szczegóły dotyczące wersji Javy, Scali oraz zmiennych środowiskowych. Dzięki tym informacjom użytkownicy mogą lepiej zrozumieć konfigurację systemu, na którym działa aplikacja Spark.

Executors – Ta zakładka prezentuje dane o wykonawcach zadań (executors), umożliwiając porównanie wykorzystania pamięci przez poszczególnych wykonawców, czasu ich pracy oraz ilości przetworzonych danych. Informacje te pozwalają na monitorowanie efektywności wykonawców i pomagają w optymalizacji zadań. Dodatkowo, dostępne są informacje o wykonawcach aktywnych oraz tych, którzy zakończyli swoje zadania.

SQL/DataFrame – Sekcja ta wyświetla zapytania SQL, które zostały wykonane w ramach aplikacji, wraz z czasem ich wykonania, identyfikatorem oraz opisem. Dzięki temu użytkownicy mogą śledzić i analizować zapytania SQL oraz DataFrame, co pozwala na lepsze zrozumienie i optymalizację zapytań.

JDBC/ODBC Server – Zakładka ta pokazuje statystyki dotyczące sesji JDBC/ODBC, w tym informacje o wykonanych zapytaniach SQL. Może być przydatna do monitorowania zapytań wykonywanych z zewnętrznych źródeł i baz danych.

Structured Streaming – W tej sekcji prezentowane są informacje dotyczące zapytań streamingowych. Umożliwia to śledzenie postępu w czasie rzeczywistym, a także monitorowanie statusu zapytań streamingowych w trakcie ich wykonywania.


"""


In [0]:
#ZADANIE 3
moviesDf.explain(True)


In [0]:
from pyspark.sql import functions as F

groupedDf = moviesDf.groupBy("genre").agg(F.avg("avg_vote").alias("average_vote"))

groupedDf.explain(True)