
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/201508_trip_data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = (
    spark.read.format(file_type)
    .option("inferSchema", infer_schema)
    .option("header", "true")
    .option("sep", delimiter)
    .load(file_location)
)

display(df)

In [0]:
# Create a view or table

temp_table_name = "/FileStore/tables/201508_trip_data.csv"

df.createOrReplaceTempView("trip_data")


import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    explode,
    regexp_replace,
    regexp_extract,
    when,
    array_contains,
    avg,
    sum,
    countDistinct,
    isnan,
    split,
    expr,
    pandas_udf,
)
from pyspark.sql.types import DoubleType, StringType

# Zadanie 1

##Użycie poniższych funkcji: Nulls, fill, explode, drop, regexp_replace, regexp_extract, ifnull, nullIf, replace, array_contains. 

In [0]:
# a.	Użyj poniższe funkcje Nulls, fill, explode, drop, regexp_replace, regexp_extract, ifnull, nullIf, replace, array_contains.

# Nulls - sprawdzanie, czy kolumna zawiera wartości NULL


df.filter(col("Zip Code").isNull()).show()

In [0]:
# fill - Wypełnienie wartości NULL w danej kolumnie:

df.fillna({"Zip Code": "Brak danych"}).show()

In [0]:
# explode - jeśli mamy kolumne z listami i chcemy każdą wartość w osobnym wierszu
df_exploded = df.withColumn("Exploded_Start", explode(split(col("Start Date"), " ")))
display(df_exploded)

In [0]:
# a.	Użyj poniższe funkcje Nulls, fill, explode, drop, regexp_replace, regexp_extract, ifnull, nullIf, replace, array_contains.

# drop - sluzy do usuwania

df_dropped = df.drop("Exploded_Start")
display(df_dropped)

In [0]:
# regexp_replace - zamienianie wzorcow znakow

df_replaced = df.withColumn(
    "Start Station",
    regexp_replace(
        df["Start Station"], "San Antonio Shopping Center", "SA Shopping Center"
    ),
)

display(df_replaced)

In [0]:
# regexp_extract - pozwala wyciągnąć fragmenty tekstu pasujące do wyrażenia regularnego.
from pyspark.sql.functions import regexp_extract, col, coalesce, lit

df_reg = df.withColumn(
    "Extracted", regexp_extract(col("Start Station"), r"\bS\w*\b", 0)
)
df = df.withColumn(
    "Start Station", coalesce(col("Start Station"), lit("Unknown Station"))
)
df_reg.show()

In [0]:
# ifnull - jesli wartość w danej kolumnie jest NULL, zamienia ją na określoną wartośc
from pyspark.sql import functions as F


df_null = df.withColumn(
    "Start Station", F.coalesce(F.col("Start Station"), F.lit("Unknown Station"))
)
display(df_null)

In [0]:
#  nullIf - porównuje dwie wartości i zwraca null, jeśli te wartości są równe, w przeciwnym razie zwraca pierwszą wartość

df_nullif = df.withColumn("Duration", expr("nullIf(Duration, '444')"))
display(df_nullif)

In [0]:
#  replace - służy do zastępowania określonych wartości w DataFrame na inne
df_replaced = df.replace(
    "Spear at Folsom", "Commercial at Montgomery", subset=["Start Station"]
)
display(df_replaced)

In [0]:
# array_contains - służy do sprawdzania, czy w kolumnie zawierającej tablicę (array) znajduje się określona wartość

df_replaced = df_replaced.withColumn("end_array", split(col("End Station"), " "))
df_replaced = df_replaced.withColumn(
    "contains_San_Francisco", array_contains(col("end_array"), "San Francisco")
)


display(df_replaced)

## Użycie 3 funkcji agregujących 

In [0]:
from pyspark.sql.functions import avg, count, min, max

# 1 avg
df_avg_dur = df.groupBy("Start Station").agg(avg("Duration").alias("avg_dur"))
display(df_avg_dur)

# 2  count
df2 = df.groupBy("Bike #").agg(count("Bike #").alias("bike_count"))
display(df2)

# 3 - sum
df_sum_dur = df.groupBy("Start Station").agg(sum("Duration").alias("total_dur"))
display(df_sum_dur)

# Zadanie 2

## Funkcje UDF 

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

# funkcja do obliczania podatku
df_test = df.withColumn("Bike #", col("Bike #").cast("int"))


@pandas_udf(DoubleType())
def calculate_tax(dockcount: pd.Series) -> pd.Series:
    dockcount = pd.to_numeric(
        dockcount, errors="coerce"
    )  # Zmieniamy teksty na NaN, jeżeli nie uda się konwertować
    tax = dockcount.apply(lambda x: x * 0.05 if x > 20 else 2 if pd.notna(x) else 0)
    return tax


df_with_tax = df_test.withColumn("tax", calculate_tax(df_test["Bike #"]))
display(df_with_tax)

In [0]:
from pyspark.sql.types import BooleanType

# funkcja która sprawdza, czy w 'Start Station' znajduje się słowo 'San'
def contains_san(start_station: str) -> bool:
    if start_station and "San" in start_station:
        return True
    return False


# Rejestracja funkcji jako UDF
contains_san_udf = udf(contains_san, BooleanType())

# Dodanie nowej kolumny do DataFrame
df_with_san_flag = df.withColumn("contains_san", contains_san_udf(df["Start Station"]))

# Wyświetlenie wyniku
display(df_with_san_flag)