In [0]:
%run "./3. Pobierz Dane"


In [0]:
filePath = "dbfs:/FileStore/tables/Files/201508_station_data.csv"
display(dbutils.fs.ls("/FileStore/tables/"))
station_data = spark.read.format("csv").option("header","true").option("inferSchema","true").load(filePath)

display(station_data)

In [0]:
from pyspark.sql.functions import col, expr, regexp_replace, regexp_extract, when, array_contains, explode, avg, count, sum

# 1. Nulls - Sprawdzenie brakujących wartości
station_data.select([count(when(col(c).isNull(), c)).alias(c) for c in station_data.columns]).show()

# 2. fill - Uzupełnienie wartości null domyślną wartością
station_data_filled = station_data.fillna({"dockcount": "0"})

# 3. explode - Przykładowe zastosowanie na kolumnie zawierającej listy
df_array = station_data.withColumn("landmark_array", expr("split(landmark, ' ')")).select("station_id", explode(col("landmark_array")).alias("landmark_word"))
df_array.show()

# 4. drop - Usunięcie kolumny "long"
df_dropped = station_data.drop("long")
df_dropped.show()

# 5. regexp_replace - Zamiana przecinków na średniki w nazwie stacji
df_replaced = station_data.withColumn("name", regexp_replace(col("name"), ",", ";"))
df_replaced.show()

# 6. regexp_extract - Wyodrębnienie tylko pierwszego słowa z kolumny "name"
df_extracted = station_data.withColumn("first_word", regexp_extract(col("name"), "(\\w+)", 1))
df_extracted.show()

# 7. ifnull - Zastąpienie wartości null inną wartością
df_ifnull = station_data.withColumn("dockcount", when(col("dockcount").isNull(), 0).otherwise(col("dockcount")))
df_ifnull.show()

# 8. nullIf - Zastąpienie wartości "San Jose" wartością null
df_nullif = station_data.withColumn("landmark", expr("nullif(landmark, 'San Jose')"))
df_nullif.show()

# 9. replace - Zamiana wartości "San Jose" na "SJ"
df_replaced_city = station_data.withColumn("landmark", regexp_replace(col("landmark"), "San Jose", "SJ"))
df_replaced_city.show()

# 10. array_contains - Sprawdzenie, czy "landmark" zawiera "San"
df_contains = station_data.withColumn("contains_San", array_contains(expr("split(landmark, ' ')").cast("array<string>"), "San"))
df_contains.show()

In [0]:
# Funkcje agregujące
# 1. Średnia liczba stacji (dockcount)
station_data.select(avg("dockcount").alias("avg_dockcount")).show()

# 2. Liczba stacji w każdym mieście
station_data.groupBy("landmark").agg(count("station_id").alias("station_count")).show()

# 3. Suma dockcount dla każdego miasta
station_data.groupBy("landmark").agg(sum("dockcount").alias("total_dockcount")).show()


In [0]:
# Funkcje UDF

from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import IntegerType, StringType
import pandas as pd

# zwiększanie dockcount o 10%
def increase_dockcount(dockcount):
    return int(dockcount * 1.1) if dockcount is not None else None

    @pandas_udf(StringType())
def uppercase_name(name: pd.Series) -> pd.Series:
    return name.str.upper()

station_data = station_data.withColumn("dockcount_adjusted", increase_dockcount_udf(station_data.dockcount))
station_data = station_data.withColumn("name_upper", uppercase_name(station_data.name))

display(station_data)

[0;36m  File [0;32m<command-1880178919371002>:12[0;36m[0m
[0;31m    def uppercase_name(name: pd.Series) -> pd.Series:[0m
[0m                                                     ^[0m
[0;31mIndentationError[0m[0;31m:[0m unexpected unindent
