## we create a specific notebook to update athletes because we apply unique transformations to it

no parametrization as we have only one file

In [0]:
df = spark.read.format("parquet")\
    .load(f"abfss://bronze@olympicsprogectstorage.dfs.core.windows.net/athletes")


In [0]:
display(df)

### replace missing values with "unknown"

In [0]:
df = df.fillna({"birth_place":"Unknown", "residence_place":"Unknown", "birth_country":"Unknown", "residence_country":"Unknown"})

In [0]:
display(df)

### filter rows based on some condition

In [0]:
from pyspark.sql.functions import *

df_filtered = df.filter(
    (col('current')== True) & 
    (col('name').isin('GALSTYAN Slavik','HARUTYUNYAN Arsen', 'SEHEN Sajjad') )
    )


In [0]:
display(df_filtered)

### change data type

In [0]:
df = df.withColumn("height", col("height").cast("float"))
df = df.withColumn("weight", col("weight").cast("float"))

In [0]:
display(df)

#### if height and weight are 0 it is better to set them to null

In [0]:
# if height and weight are 0 it is better to set them to null
df = df.withColumn("height", when(col("height") == 0.0, None).otherwise(col("height")))
df = df.withColumn("weight", when(col("weight") == 0.0, None).otherwise(col("weight")))

### sort rows based on desc height and weight

In [0]:
df_sorted = df.sort('height', 'weight', ascending = [0,1])

In [0]:
df_sorted.display()

#### replace values with regular expression

In [0]:
df_sorted = df_sorted.withColumn("nationality", regexp_replace(col('nationality'), 'United States', 'US'))

#### group by + aggregation + alias

In [0]:
df_sorted.groupBy('code').agg(count('code').alias('total_count')).filter(col('total_count') > 1).display()

### rename a column

In [0]:
df_sorted = df_sorted.withColumnRenamed('code', 'athlete_id')
display(df_sorted)

Databricks visualization. Run in Databricks to view.

#### split a string column

In [0]:
df_sorted = df_sorted.withColumn('occupation', split('occupation',','))



### select only certain columns

In [0]:
df_final = df_sorted.select(['athlete_id', 'current', 'name', 'name_short', 'name_tv', 'gender', 'function', 'country_code', 'country', 'country_long', 'nationality', 'nationality_long', 'nationality_code', 'height', 'weight'])

#### window function with sum and partition by

In [0]:
from pyspark.sql.window import Window

df_final.withColumn("cum_weight", sum("weight").over(Window.partitionBy("nationality").orderBy("height").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))).display()

### HOW TO MOVE TO SQL BY SAVING A DF IN SQL

In [0]:
# TO DO THE SAME IN SQL FIRST CREATE A TABLE OR VIEW FROM THE PYSPARK DF:

df_final.createOrReplaceTempView("df_final")

In [0]:
%sql
-- equivalent in sql

SELECT
  *,
  SUM(weight) OVER (PARTITION BY nationality ORDER BY height
  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS cum_weight
FROM df_final

### WRITE THE FINAL DF AS A DELTA FILE IN SILVER FOLDER IN AZURE

In [0]:
df_final.write.format("delta")\
  .mode("overwrite")\
        .option("path", "abfss://silver@olympicsprogectstorage.dfs.core.windows.net/athletes")\
            .saveAsTable("olympicspariscatalog.silver.athletes")