<a href="https://colab.research.google.com/github/poolaNaveen/PySpark/blob/main/pyspark6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**BRONZE, SILVER, GOLD**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
!pip uninstall -y pyspark


Found existing installation: pyspark 3.5.0
Uninstalling pyspark-3.5.0:
  Successfully uninstalled pyspark-3.5.0


In [3]:
!pip install pyspark==3.5.1 delta-spark==3.2.0


Collecting pyspark==3.5.1
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=e29d6b3d1c521809e0783194a0c6d67f199d592aa57748d771b85f05b6126497
  Stored in directory: /root/.cache/pip/wheels/b1/91/5f/283b53010a8016a4ff1c4a1edd99bbe73afacb099645b5471b
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("citibike") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [5]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("DeltaTest") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Create a small DataFrame
df = spark.createDataFrame([(1, "bikeA"), (2, "bikeB")], ["id", "name"])

# Write to Delta
df.write.format("delta").mode("overwrite").save("/content/test_delta")

# Read back
df2 = spark.read.format("delta").load("/content/test_delta")
df2.show()


+---+-----+
| id| name|
+---+-----+
|  2|bikeB|
|  1|bikeA|
+---+-----+



In [7]:
# 3. Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, current_timestamp
from pyspark.sql.types import DecimalType, TimestampType
from delta import configure_spark_with_delta_pip

# 4. Configure Spark with Delta Lake
builder = SparkSession.builder \
    .appName("citibike") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# 5. Read Landing Layer CSV
landing_df = spark.read.csv("/content/travel.csv",
                            header=True,
                            inferSchema=True)

# 6. Basic cleaning
clean_df = landing_df.select(
    trim(col("ride_id")).alias("ride_id"),
    trim(col("rideable_type")).alias("rideable_type"),
    trim(col("started_at")).alias("started_at"),
    trim(col("ended_at")).alias("ended_at"),
    trim(col("start_station_name")).alias("start_station_name"),
    trim(col("start_station_id")).alias("start_station_id"),
    trim(col("end_station_name")).alias("end_station_name"),
    trim(col("end_station_id")).alias("end_station_id"),
    col("start_lat").cast(DecimalType(10, 6)),
    col("start_lng").cast(DecimalType(10, 6)),
    col("end_lat").cast(DecimalType(10, 6)),
    col("end_lng").cast(DecimalType(10, 6)),
    trim(col("member_casual")).alias("member_casual")
)

# 7. Convert timestamps and add metadata
bronze_df = clean_df.withColumn("started_at", col("started_at").cast(TimestampType())) \
                    .withColumn("ended_at", col("ended_at").cast(TimestampType())) \
                    .withColumn("metadata", current_timestamp())

# 8. Save as Delta Lake Bronze Table
bronze_df.write.format("delta") \
    .mode("overwrite") \
    .save("/content/bronze/jc_citibike")

# 9. Read back to verify
df = spark.read.format("delta").load("/content/bronze/jc_citibike")
df.show(5)


+----------------+-------------+----------+--------+------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+--------------------+
|         ride_id|rideable_type|started_at|ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|            metadata|
+----------------+-------------+----------+--------+------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+--------------------+
|29DAF43DD84B4B7A|electric_bike|      NULL|    NULL|   6 St & Grand St|           HB302|Mama Johnson Fiel...|         HB404|40.744398|-74.034501|40.743140|-74.040041|       member|2025-12-02 15:16:...|
|B11B4220F7195025|electric_bike|      NULL|    NULL|  Heights Elevator|           JC059|        Jersey & 3rd|         JC074|40.748716|-74.040443|40.723332|-74.045953|       member|2025-12-02 1

In [8]:
from pyspark.sql import functions as F

# 1. Read Bronze Delta Table
bronze_df = spark.read.format("delta").load("/content/bronze/jc_citibike")

# 2. Enrich data for Silver layer
silver_df = bronze_df \
    .withColumn("ride_duration_minutes",
                (F.col("ended_at").cast("long") - F.col("started_at").cast("long")) / 60) \
    .withColumn("member_type",
                F.when(F.col("member_casual") == "member", "Member")
                 .when(F.col("member_casual") == "casual", "Casual")
                 .otherwise("Unknown")) \
    .filter(F.col("started_at").isNotNull() & F.col("ended_at").isNotNull()) \
    .filter(F.col("ride_duration_minutes") > 0)

# 3. Save Silver Delta Table
silver_df.write.format("delta") \
    .mode("overwrite") \
    .save("/content/silver/jc_citibike")

# 4. Read back to verify
df_silver = spark.read.format("delta").load("/content/silver/jc_citibike")
df_silver.show(5)


+-------+-------------+----------+--------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+--------+---------------------+-----------+
|ride_id|rideable_type|started_at|ended_at|start_station_name|start_station_id|end_station_name|end_station_id|start_lat|start_lng|end_lat|end_lng|member_casual|metadata|ride_duration_minutes|member_type|
+-------+-------------+----------+--------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+--------+---------------------+-----------+
+-------+-------------+----------+--------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+--------+---------------------+-----------+



In [10]:
from pyspark.sql import functions as F

# 1. Read Silver Delta Table
silver_df = spark.read.format("delta").load("/content/silver/jc_citibike")

# 2. Aggregate for Gold layer
gold_df = silver_df.groupBy(
    F.date_format("started_at", "yyyy-MM-dd").alias("ride_date"),
    "member_type",
    "start_station_name"
).agg(
    F.count("*").alias("total_rides"),
    F.avg("ride_duration_minutes").alias("avg_duration_minutes"),
    F.min("ride_duration_minutes").alias("min_duration_minutes"),
    F.max("ride_duration_minutes").alias("max_duration_minutes")
)

# 3. Save Gold Delta Table
gold_df.write.format("delta") \
    .mode("overwrite") \
    .save("/content/gold/jc_citibike")

# 4. Read back to verify
df_gold = spark.read.format("delta").load("/content/gold/jc_citibike")
df_gold.show(30)


+---------+-----------+------------------+-----------+--------------------+--------------------+--------------------+
|ride_date|member_type|start_station_name|total_rides|avg_duration_minutes|min_duration_minutes|max_duration_minutes|
+---------+-----------+------------------+-----------+--------------------+--------------------+--------------------+
+---------+-----------+------------------+-----------+--------------------+--------------------+--------------------+



In [12]:
landing_df = spark.read.csv("/content/travel.csv", header=True, inferSchema=True)


In [14]:
from pyspark.sql.functions import col, trim, current_timestamp
from pyspark.sql.types import DecimalType, TimestampType

bronze_df = landing_df.select(
    trim(col("ride_id")).alias("ride_id"),
    trim(col("rideable_type")).alias("rideable_type"),
    trim(col("started_at")).cast(TimestampType()).alias("started_at"),
    trim(col("ended_at")).cast(TimestampType()).alias("ended_at"),
    trim(col("start_station_name")).alias("start_station_name"),
    trim(col("start_station_id")).alias("start_station_id"),
    trim(col("end_station_name")).alias("end_station_name"),
    trim(col("end_station_id")).alias("end_station_id"),
    col("start_lat").cast(DecimalType(10, 6)),
    col("start_lng").cast(DecimalType(10, 6)),
    col("end_lat").cast(DecimalType(10, 6)),
    col("end_lng").cast(DecimalType(10, 6)),
    trim(col("member_casual")).alias("member_casual")
).withColumn("metadata", current_timestamp())
bronze_df.show(10)

bronze_df.write.format("delta").mode("overwrite").save("/content/bronze/jc_citibike")


+----------------+-------------+----------+--------+------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+--------------------+
|         ride_id|rideable_type|started_at|ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|            metadata|
+----------------+-------------+----------+--------+------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+--------------------+
|29DAF43DD84B4B7A|electric_bike|      NULL|    NULL|   6 St & Grand St|           HB302|Mama Johnson Fiel...|         HB404|40.744398|-74.034501|40.743140|-74.040041|       member|2025-12-02 15:28:...|
|B11B4220F7195025|electric_bike|      NULL|    NULL|  Heights Elevator|           JC059|        Jersey & 3rd|         JC074|40.748716|-74.040443|40.723332|-74.045953|       member|2025-12-02 1

In [24]:
bronze_df = spark.read.format("delta").load("/content/bronze/jc_citibike")
bronze_df.select("ride_id","started_at","ended_at").show(10, False)
silver_df = bronze_df.select(
    col("ride_id"),
    to_date(col("started_at")).alias("trip_start_date"),
    col("started_at"),
    col("ended_at"),
    col("start_station_name"),
    col("end_station_name"),
    ((col("ended_at").cast("long") - col("started_at").cast("long")) / 60).alias("trip_duration_mins"),
    col("metadata")
)

silver_df.show(10, False)
from pyspark.sql.functions import to_timestamp

bronze_df = landing_df.select(
    col("ride_id"),
    to_timestamp(col("started_at"), "yyyy-MM-dd HH:mm:ss").alias("started_at"),
    to_timestamp(col("ended_at"), "yyyy-MM-dd HH:mm:ss").alias("ended_at"),
    # ... other columns ...
)



+----------------+----------+--------+
|ride_id         |started_at|ended_at|
+----------------+----------+--------+
|DDD973FEED07C0DF|NULL      |NULL    |
|7356497BFB2EDA2F|NULL      |NULL    |
|10743EF6640BBB15|NULL      |NULL    |
|8FC91C5AC697CB78|NULL      |NULL    |
|B600C6359586E0E8|NULL      |NULL    |
|FA1DDE4B52ADBF5A|NULL      |NULL    |
|F41D88A622E8050C|NULL      |NULL    |
|851A4ED5495DE630|NULL      |NULL    |
|23165E2C35306B50|NULL      |NULL    |
|99D3D0B592E30B99|NULL      |NULL    |
+----------------+----------+--------+
only showing top 10 rows

+----------------+---------------+----------+--------+----------------------------------------+------------------------------------------+------------------+--------------------------+
|ride_id         |trip_start_date|started_at|ended_at|start_station_name                      |end_station_name                          |trip_duration_mins|metadata                  |
+----------------+---------------+----------+--------+----

In [4]:
!pip install pyspark==3.5.1 delta-spark==3.2.0

Collecting delta-spark==3.2.0
  Downloading delta_spark-3.2.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.2.0


In [5]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("citibike") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [6]:
silver_df = spark.read.format("delta").load("/content/silver/jc_citibike")
silver_df.show(10, False)


Py4JJavaError: An error occurred while calling o32.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


In [3]:
# Gold layer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,to_date
spark = SparkSession.builder.appName("citibike").getOrCreate()
silver_df = spark.read.format("delta").load("/content/silver/jc_citibike")
silver_df.show(10, False)


Py4JJavaError: An error occurred while calling o26.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more
