#1. Query'owanie Danych w notebookach

In [0]:
%sql
select * from samples.nyctaxi.trips limit 100

## 2. Liczba podróży per ZIP

In [0]:
%sql
select
pickup_zip, 
count(pickup_zip) as RIDE_NUM  
from samples.nyctaxi.trips 
where tpep_dropoff_datetime > '2016-02-01'
group by pickup_zip 
order by RIDE_NUM desc

## 3. Cwiczenie: Querowanie danych

Zadanie:

**Potestuj pisanie SQL'owego kodu w Notebookach**
1. Pobierz wyświetl wszystkie kursy rozpoczęte w 2016 roku
2. policz liczbe kursów dla każdego dnia w styczniu 2016
3. policz sredni koszt kursu w 2016 roku

### Rozwiązanie zadanie 1

In [0]:
%sql
SELECT *
FROM samples.nyctaxi.trips
WHERE tpep_pickup_datetime >= '2016-01-01' AND tpep_pickup_datetime < '2017-01-01' 

### Rozwiazanie 2

In [0]:
%sql
SELECT DAYOFWEEK(tpep_pickup_datetime) AS day_of_week,
       COUNT(*) AS trip_count
FROM samples.nyctaxi.trips
WHERE tpep_pickup_datetime >= '2016-01-01' AND tpep_pickup_datetime < '2016-02-01'
GROUP BY DAYOFWEEK(tpep_pickup_datetime)
ORDER BY day_of_week;

### Rozwiazanie 3

In [0]:
%sql
SELECT AVG(fare_amount) AS avg_trip_cost
FROM samples.nyctaxi.trips
WHERE tpep_pickup_datetime >= '2016-01-01' AND tpep_pickup_datetime < '2017-01-01';


# 2. Pyspark

W databricks mamy dostępny obiekt `spark`, który jest natywnie dostępny w całym kodzie, jeżeli używamy języka jako Python. Jest to dostarczane przez cluster na którym pracujemy

In [0]:
spark.version

In [0]:
!python --version

In [0]:
dataframe = spark.sql("select * from samples.nyctaxi.trips limit 100") # to samo co w punkcie 1

Pamiętajmy, że Spark to język leniwy, wiec dopiero po komendzie akcji, coś się wydarzy

In [0]:
dataframe.show()

## Przykłady

Inicjalizacja tabeli (dataframe'u) - w skrócie df

In [0]:
from pyspark.sql import functions as F # pobieramy funkcje
df = spark.table("samples.nyctaxi.trips")

Select wybranych kolumn

In [0]:
dataframe.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "trip_distance",
    "fare_amount",
    "pickup_zip",
    "dropoff_zip"
).show(10)

filtrowanie

In [0]:
filtered_df = df.filter(
    (F.col("tpep_pickup_datetime") >= "2016-01-01") & 
    (F.col("tpep_pickup_datetime") < "2017-01-01")
)

filtered_df.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "trip_distance",
    "fare_amount"
).show(10)


Count per dni tygodnia

In [0]:
grouped_df = filtered_df.filter(
    (F.col("tpep_pickup_datetime") >= "2016-01-01") & 
    (F.col("tpep_pickup_datetime") < "2016-02-01")
).withColumn(
    "day_of_week", F.dayofweek("tpep_pickup_datetime")
).groupBy("day_of_week").agg(
    F.count("*").alias("trip_count")
).orderBy("day_of_week")

grouped_df.show()


Srednia cena przejazdu w 2016 roku

In [0]:
# filtrowanie czasu
filtered_df = df.filter(
    (F.col("tpep_pickup_datetime") >= "2016-01-01") & 
    (F.col("tpep_pickup_datetime") < "2017-01-01")
)

# aggregacja ze srednia
avg_fare = filtered_df.agg(
    F.avg("fare_amount").alias("avg_fare_amount")
)

# Wyświetlenie średniej ceny - tutaj dopiero program się wykona
avg_fare.show()

In [0]:
type(avg_fare)

Przeniesienie wyniku do pamięci clustra - funkcja collect()

In [0]:
result = avg_fare.collect()
print(f"Średnia cena kursu w 2020 roku wynosi: {result}")

In [0]:
result[0]

In [0]:
avg_fare_value = result[0]["avg_fare_amount"]

In [0]:
type(avg_fare_value) # typ zmiennoprzecinkowy, zapamiętany w pamięci clustra

## Cwiczenia

1. Znajdź kursy, których odległość przekracza 25 mil
2. Znajdź najdłuższy kurs wedlug odleglosci - zwróc tylko jeden rząd
3. Oblicz liczbe kursów według godzin w ciągu dnia 


In [0]:
from pyspark.sql import functions as F # pobieramy funkcje
df = spark.table("samples.nyctaxi.trips")

### Podpowiedzi


2. komendy: `orderBy`, `desc` oraz `limit`
3. komendy: `F.hour`, `F.count`, `groupBy`

### Rozwiazanie 1

In [0]:
outliers = df.filter(F.col("trip_distance") > 25)

outliers.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "trip_distance",
    "fare_amount"
).show()

### Rozwiazanie 2

In [0]:
longest_trip = df.orderBy(F.col("trip_distance").desc()).limit(1)

longest_trip.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "trip_distance",
    "fare_amount"
).show()


### Rozwiazanie 3

In [0]:
hourly_trips = df.withColumn(
    "hour_of_day", F.hour("tpep_pickup_datetime")
).groupBy("hour_of_day").agg(
    F.count("*").alias("trip_count")
).orderBy("hour_of_day")

hourly_trips.show(24)


### Ćwiczenia dodatkowe

4. Oblicz średni koszt za milę dla wszystkich kursów (koszt = fare_amount, odległość = trip_distance)
5. Znajdź 5 najpopularniejszych kodów pocztowych, z których rozpoczynały się kursy (pickup_zip)
6. Oblicz czas trwania każdego kursu w minutach i znajdź najdłuższy.


#### Rozwiazania Dodatkowych ćwiczeń

In [0]:
# 4
avg_cost_per_mile = df.filter(F.col("trip_distance") > 0).withColumn(
    "cost_per_mile", F.col("fare_amount") / F.col("trip_distance")
).agg(
    F.avg("cost_per_mile").alias("avg_cost_per_mile")
)

avg_cost_per_mile.show()


In [0]:
# 5
popular_pickup_zips = df.groupBy("pickup_zip").agg(
    F.count("*").alias("trip_count")
).orderBy(F.col("trip_count").desc()).limit(5)

popular_pickup_zips.show()


In [0]:
# 6
longest_duration = df.withColumn(
    "duration_minutes", 
    (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 60
).orderBy(F.col("duration_minutes").desc()).limit(1)

longest_duration.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "duration_minutes",
    "fare_amount"
).show()



chyba ktoś nie wyłączył czasu ;) 