# **Ad-hoc analysis over lake**

## Query data from files

NOTE: Trip data to be downloaded from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

**HINT:** Use drag & drop to generate code for creating dataframes.<br/><br/>
<img src="https://learn.microsoft.com/en-us/fabric/data-engineering/media/author-execute-notebook/drag-drop-insert-snippet.gif" width="800px">

In [2]:
df = spark.read.parquet("Files/raw/trips/yellow_tripdata_2022-01.parquet")
df.createOrReplaceTempView("tmpVwTrip")

df1 = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("Files/raw/payment_type/payment_type.csv")
df1.createOrReplaceTempView("tmpVwPaymentType")

spark.sql("SELECT COUNT(*) FROM tmpVwTrip;").show()

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 4, Finished, Available)

+--------+
|count(1)|
+--------+
| 2463931|
+--------+



In [3]:
%%sql
SELECT p.payment_type_name, SUM(t.passenger_count) AS Passengers, COUNT(*) AS Trips
FROM tmpVwPaymentType AS p
LEFT JOIN tmpVwTrip AS t
ON p.payment_type_id = t.payment_type
GROUP BY p.payment_type_name
ORDER BY Passengers DESC;

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 5, Finished, Available)

<Spark SQL result set with 6 rows and 3 fields>

In [2]:
df_output = spark.sql(
"""
SELECT p.payment_type_name, SUM(t.passenger_count) AS Passengers, COUNT(*) AS Trips
FROM tmpVwPaymentType AS p
LEFT JOIN tmpVwTrip AS t
ON p.payment_type_id = t.payment_type
GROUP BY p.payment_type_name
ORDER BY Passengers DESC;
""")
df_output.write.format("delta").option("overwrite", "true").save("Files/output/trips_by_pament_type")

StatementMeta(, 917d1bb1-feca-4c55-a14c-bf1947b48cc4, 4, Finished, Available)

## Query data from tables

In [4]:
df = spark.sql("SELECT * FROM DemoLakehouse.trip")
display(df.tail(5))

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, 44c2f6ef-8355-475e-bc4b-1e3e283ec4e8)

In [5]:
%%sql
SELECT p.payment_type_name, SUM(t.passenger_count) AS Passengers, COUNT(*) AS Trips
FROM DemoLakehouse.payment_type AS p
LEFT JOIN DemoLakehouse.trip AS t
ON p.payment_type_id = t.payment_type
GROUP BY p.payment_type_name
ORDER BY Passengers DESC;

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 7, Finished, Available)

<Spark SQL result set with 6 rows and 3 fields>

In [4]:
%%sql
CREATE OR REPLACE TABLE trip_agg
--USING DELTA
AS
SELECT p.payment_type_name, SUM(t.passenger_count) AS Passengers, COUNT(*) AS Trips
FROM DemoLakehouse.payment_type AS p
LEFT JOIN DemoLakehouse.trip AS t
ON p.payment_type_id = t.payment_type
GROUP BY p.payment_type_name
ORDER BY Passengers DESC;

StatementMeta(, 917d1bb1-feca-4c55-a14c-bf1947b48cc4, 6, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

## **Beyond SQL: Use Python to work on data quality**

Some functions to learn:
- dropDuplicates()
- na.drop()
- fillna()
- fill()

### Detect (remove) duplicates

In [6]:
df = spark.sql("SELECT * FROM DemoLakehouse.trip")
duplicate_rows = df.count() - df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_rows}")

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 8, Finished, Available)

Number of duplicate rows: 0


### Detect (remove) rows with null values in columns

In [7]:
df = spark.sql("SELECT * FROM DemoLakehouse.trip")
rows_with_nulls = df.count() - df.na.drop().count()
print(f"Number of rows with nulls: {rows_with_nulls}")

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 9, Finished, Available)

Number of rows with nulls: 71503


### Profile columns for null values

In [8]:
from pyspark.sql.functions import col, sum

display(df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)))

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, 07557288-1e67-44cf-a388-c811fe914509)

### Detect outliers in numeric column

In [9]:
numeric_column = "passenger_count"
quantiles = df.approxQuantile(f"{numeric_column}", [0.01, 0.99], 0.0)
lower_bound = quantiles[0]
upper_bound = quantiles[1]

outliers = df.filter((df[f"{numeric_column}"] < lower_bound) | (df[f"{numeric_column}"] > upper_bound))
display(outliers)

StatementMeta(, 58c19fde-74ff-40df-b273-76ed9c3f1036, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, 574188bb-9fff-40e2-bfd3-866884e5b20b)