<a href="https://colab.research.google.com/github/usergotnewexp/Pyspark-pipeline-implementation/blob/main/Pyspark_pipeline_imp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q pyspark==3.5.1
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = (SparkSession.builder.appName("ColabSparkAdvancedTutorial")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "4")
         .getOrCreate())
print("Spark version:", spark.version)

data = [
    (1, "Alice", "IN", "2025-10-01", 56000.0, "premium"),
    (2, "Bob", "US", "2025-10-03", 43000.0, "standard"),
    (3, "Carlos", "IN", "2025-09-27", 72000.0, "premium"),
    (4, "Diana", "UK", "2025-09-30", 39000.0, "standard"),
    (5, "Esha", "IN", "2025-10-02", 85000.0, "premium"),
    (6, "Farid", "AE", "2025-10-02", 31000.0, "basic"),
    (7, "Gita", "IN", "2025-09-29", 46000.0, "standard"),
    (8, "Hassan", "PK", "2025-10-01", 52000.0, "premium"),
]
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("country", StringType(), True),
    StructField("signup_date", StringType(), True),
    StructField("income", FloatType(), True),
    StructField("plan", StringType(), True),
])
df = spark.createDataFrame(data, schema)
df.show()


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 1.0.1 requires pyspark[connect]~=4.0.0, but you have pyspark 3.5.1 which is incompatible.[0m[31m
[0mSpark version: 3.5.1
+---+------+-------+-----------+-------+--------+
| id|  name|country|signup_date| income|    plan|
+---+------+-------+-----------+-------+--------+
|  1| Alice|     IN| 2025-10-01|56000.0| premium|
|  2|   Bob|     US| 2025-10-03|43000.0|standard|
|  3|Carlos|     IN| 2025-09-27|72000.0| premium|
|  4|

In [2]:
df2 = (df.withColumn("signup_ts", F.to_timestamp("signup_date"))
         .withColumn("year", F.year("signup_ts"))
         .withColumn("month", F.month("signup_ts"))
         .withColumn("is_india", (F.col("country") == "IN").cast("int")))
df2.show()

df2.createOrReplaceTempView("users")
spark.sql("""
SELECT country, COUNT(*) AS cnt, AVG(income) AS avg_income
FROM users
GROUP BY country
ORDER BY cnt DESC
""").show()

w = Window.partitionBy("country").orderBy(F.col("income").desc())
df_ranked = df2.withColumn("income_rank_in_country", F.rank().over(w))
df_ranked.show()

def plan_priority(plan):
    if plan == "premium": return 3
    if plan == "standard": return 2
    if plan == "basic": return 1
    return 0
plan_priority_udf = F.udf(plan_priority, IntegerType())
df_udf = df_ranked.withColumn("plan_priority", plan_priority_udf(F.col("plan")))
df_udf.show()

+---+------+-------+-----------+-------+--------+-------------------+----+-----+--------+
| id|  name|country|signup_date| income|    plan|          signup_ts|year|month|is_india|
+---+------+-------+-----------+-------+--------+-------------------+----+-----+--------+
|  1| Alice|     IN| 2025-10-01|56000.0| premium|2025-10-01 00:00:00|2025|   10|       1|
|  2|   Bob|     US| 2025-10-03|43000.0|standard|2025-10-03 00:00:00|2025|   10|       0|
|  3|Carlos|     IN| 2025-09-27|72000.0| premium|2025-09-27 00:00:00|2025|    9|       1|
|  4| Diana|     UK| 2025-09-30|39000.0|standard|2025-09-30 00:00:00|2025|    9|       0|
|  5|  Esha|     IN| 2025-10-02|85000.0| premium|2025-10-02 00:00:00|2025|   10|       1|
|  6| Farid|     AE| 2025-10-02|31000.0|   basic|2025-10-02 00:00:00|2025|   10|       0|
|  7|  Gita|     IN| 2025-09-29|46000.0|standard|2025-09-29 00:00:00|2025|    9|       1|
|  8|Hassan|     PK| 2025-10-01|52000.0| premium|2025-10-01 00:00:00|2025|   10|       0|
+---+-----

In [3]:
country_data = [
    ("IN", "Asia", 1.42), ("US", "North America", 0.33),
    ("UK", "Europe", 0.07), ("AE", "Asia", 0.01), ("PK", "Asia", 0.24),
]
country_schema = StructType([
    StructField("country", StringType(), True),
    StructField("region", StringType(), True),
    StructField("population_bn", FloatType(), True),
])
country_df = spark.createDataFrame(country_data, country_schema)

joined = df_udf.alias("u").join(country_df.alias("c"), on="country", how="left")
joined.show()

region_stats = (joined.groupBy("region", "plan")
                .agg(F.count("*").alias("users"),
                     F.round(F.avg("income"), 2).alias("avg_income"))
                .orderBy("region", "plan"))
region_stats.show()

+-------+---+------+-----------+-------+--------+-------------------+----+-----+--------+----------------------+-------------+-------------+-------------+
|country| id|  name|signup_date| income|    plan|          signup_ts|year|month|is_india|income_rank_in_country|plan_priority|       region|population_bn|
+-------+---+------+-----------+-------+--------+-------------------+----+-----+--------+----------------------+-------------+-------------+-------------+
|     PK|  8|Hassan| 2025-10-01|52000.0| premium|2025-10-01 00:00:00|2025|   10|       0|                     1|            3|         Asia|         0.24|
|     UK|  4| Diana| 2025-09-30|39000.0|standard|2025-09-30 00:00:00|2025|    9|       0|                     1|            2|       Europe|         0.07|
|     US|  2|   Bob| 2025-10-03|43000.0|standard|2025-10-03 00:00:00|2025|   10|       0|                     1|            2|North America|         0.33|
|     AE|  6| Farid| 2025-10-02|31000.0|   basic|2025-10-02 00:00:00|2

In [4]:
ml_df = joined.withColumn("label", (F.col("plan") == "premium").cast("int")).na.drop()
country_indexer = StringIndexer(inputCol="country", outputCol="country_idx", handleInvalid="keep")
country_fitted = country_indexer.fit(ml_df)
ml_df2 = country_fitted.transform(ml_df)

assembler = VectorAssembler(inputCols=["income", "country_idx", "plan_priority"], outputCol="features")
ml_final = assembler.transform(ml_df2)
train_df, test_df = ml_final.randomSplit([0.7, 0.3], seed=42)

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
lr_model = lr.fit(train_df)
preds = lr_model.transform(test_df)
preds.select("name", "country", "income", "plan", "label", "prediction", "probability").show(truncate=False)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = evaluator.evaluate(preds)
print("Classification accuracy:", acc)

+------+-------+-------+--------+-----+----------+-----------------------------------------+
|name  |country|income |plan    |label|prediction|probability                              |
+------+-------+-------+--------+-----+----------+-----------------------------------------+
|Carlos|IN     |72000.0|premium |1    |1.0       |[5.631679339875724E-9,0.9999999943683207]|
|Diana |UK     |39000.0|standard|0    |0.0       |[0.9999997637769654,2.362230345775984E-7]|
+------+-------+-------+--------+-----+----------+-----------------------------------------+

Classification accuracy: 1.0


In [5]:
output_path = "/content/spark_users_parquet"
joined.write.mode("overwrite").parquet(output_path)
parquet_df = spark.read.parquet(output_path)
print("Parquet reloaded:")
parquet_df.show()

recent = spark.sql("""
SELECT name, country, income, signup_ts
FROM users
WHERE signup_ts >= '2025-10-01'
ORDER BY signup_ts DESC
""")
recent.show()

recent.explain()
spark.stop()


Parquet reloaded:
+-------+---+------+-----------+-------+--------+-------------------+----+-----+--------+----------------------+-------------+-------------+-------------+
|country| id|  name|signup_date| income|    plan|          signup_ts|year|month|is_india|income_rank_in_country|plan_priority|       region|population_bn|
+-------+---+------+-----------+-------+--------+-------------------+----+-----+--------+----------------------+-------------+-------------+-------------+
|     PK|  8|Hassan| 2025-10-01|52000.0| premium|2025-10-01 00:00:00|2025|   10|       0|                     1|            3|         Asia|         0.24|
|     UK|  4| Diana| 2025-09-30|39000.0|standard|2025-09-30 00:00:00|2025|    9|       0|                     1|            2|       Europe|         0.07|
|     US|  2|   Bob| 2025-10-03|43000.0|standard|2025-10-03 00:00:00|2025|   10|       0|                     1|            2|North America|         0.33|
|     AE|  6| Farid| 2025-10-02|31000.0|   basic|202