In [3]:
from pyspark.sql import SparkSession

# 1) Initialize SparkSession
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()

# 2) Then you can read the CSV
df = spark.read.option("header", "true").csv("/Users/sanjojoy/Documents/MDA/data/kafka_data.csv", inferSchema=True)

# 3) Create a temp view for Spark SQL
df.createOrReplaceTempView("dating_data")

# 4) Now you can run a query
result = spark.sql("""
    SELECT
      state,
      COUNT(*) AS user_count
    FROM dating_data
    GROUP BY state
    ORDER BY user_count DESC
""")
result.show()

+--------------+----------+
|         state|user_count|
+--------------+----------+
|      Colorado|       238|
|         Texas|       232|
|       Indiana|       230|
|        Oregon|       229|
|    New Jersey|       229|
|     Minnesota|       225|
|    California|       225|
|       Alabama|       221|
|          Ohio|       219|
|      Oklahoma|       219|
|       Georgia|       218|
|      Kentucky|       218|
|      Maryland|       216|
|      Delaware|       214|
|       Arizona|       213|
|     Wisconsin|       213|
|       Wyoming|       212|
|South Carolina|       212|
|      New York|       212|
|          Utah|       210|
+--------------+----------+
only showing top 20 rows



In [None]:
1. COUNT OF USERS BY STATE

SyntaxError: invalid syntax (3510642606.py, line 1)

25/03/13 16:49:51 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 246592 ms exceeds timeout 120000 ms
25/03/13 16:49:51 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/13 16:49:52 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
#!/usr/bin/env python3
"""
spark_sql_queries.py

Demonstrates five different Spark SQL queries on a dating dataset, with basic visualizations.
"""

import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql import SparkSession

def main():
    # 1. Initialize Spark session
    spark = (
        SparkSession.builder
        .appName("SparkSQL_Queries")
        .master("local[4]")
        .getOrCreate()
    )

    # 2. Load data and create a temp view
    #    Update the file path if needed
    df = spark.read.option("header", "true").csv("/Users/sanjojoy/Documents/MDA/data/kafka_data.csv", inferSchema=True)
    df.createOrReplaceTempView("dating_data")
    print("Data loaded. Temp view 'dating_data' created.\n")

    # =====================
    # Query 1: Count of Users by State
    # =====================
    print("#1) Count of Users by State (descending)")

    query1 = """
        SELECT
          state,
          COUNT(*) AS user_count
        FROM dating_data
        GROUP BY state
        ORDER BY user_count DESC
    """
    result1 = spark.sql(query1)
    result1.show()

    pdf1 = result1.toPandas()

    # Visualization
    plt.figure(figsize=(8, 5))
    plt.bar(pdf1["state"].astype(str), pdf1["user_count"])
    plt.xticks(rotation=45, ha="right")
    plt.title("Count of Users by State")
    plt.xlabel("State")
    plt.ylabel("User Count")
    plt.tight_layout()
    plt.savefig("query1_state_usercount.png", dpi=100)
    plt.close()

    # =====================
    # Query 2: Average Swiping History by Gender
    # =====================
    print("#2) Average Swiping History by Gender (descending)")

    query2 = """
        SELECT
          gender,
          AVG(swiping_history) AS avg_swipes
        FROM dating_data
        GROUP BY gender
        ORDER BY avg_swipes DESC
    """
    result2 = spark.sql(query2)
    result2.show()

    pdf2 = result2.toPandas()

    plt.figure(figsize=(6, 4))
    plt.bar(pdf2["gender"].astype(str), pdf2["avg_swipes"])
    plt.title("Average Swiping History by Gender")
    plt.xlabel("Gender")
    plt.ylabel("Avg Swipes")
    plt.savefig("query2_gender_avg_swipes.png", dpi=100)
    plt.close()

    # =====================
    # Query 3: Daily High-Swipers
    # =====================
    print("#3) Daily High-Swipers (user_id, swiping_history, frequency_of_usage)")

    query3 = """
        SELECT
          user_id,
          swiping_history,
          frequency_of_usage
        FROM dating_data
        WHERE frequency_of_usage = 'Daily'
          AND swiping_history > 300
    """
    result3 = spark.sql(query3)
    result3.show()

    pdf3 = result3.toPandas()

    # We'll plot a histogram of swiping_history among these daily high-swipers
    plt.figure(figsize=(6, 4))
    plt.hist(pdf3["swiping_history"], bins=10, edgecolor="black")
    plt.title("Swiping History among Daily High-Swipers (>300)")
    plt.xlabel("Swiping History")
    plt.ylabel("Count of Users")
    plt.savefig("query3_daily_highswipers.png", dpi=100)
    plt.close()

    # =====================
    # Query 4: Count of “Looking For” Category, Grouped by Gender
    # =====================
    print("#4) Count of Looking For Category, Grouped by Gender")

    query4 = """
        SELECT
          gender,
          looking_for,
          COUNT(*) AS total_users
        FROM dating_data
        GROUP BY gender, looking_for
        ORDER BY total_users DESC
    """
    result4 = spark.sql(query4)
    result4.show()

    pdf4 = result4.toPandas()

    # We'll pivot for stacked bar or grouped bar
    pivot4 = pdf4.pivot(index="gender", columns="looking_for", values="total_users").fillna(0)

    pivot4.plot(kind="bar", figsize=(7, 4), stacked=True)
    plt.title("Count of Looking For by Gender")
    plt.xlabel("Gender")
    plt.ylabel("Total Users")
    plt.xticks(rotation=0)
    plt.legend(title="Looking For")
    plt.tight_layout()
    plt.savefig("query4_lookingfor_gender.png", dpi=100)
    plt.close()

    # =====================
    # Query 5: Average Height by Occupation
    # =====================
    print("#5) Average Height by Occupation (descending)")

    query5 = """
        SELECT
          occupation,
          ROUND(AVG(height), 2) AS avg_height
        FROM dating_data
        GROUP BY occupation
        ORDER BY avg_height DESC
    """
    result5 = spark.sql(query5)
    result5.show()

    pdf5 = result5.toPandas()

    plt.figure(figsize=(7, 5))
    plt.bar(pdf5["occupation"].astype(str), pdf5["avg_height"])
    plt.xticks(rotation=45, ha="right")
    plt.title("Average Height by Occupation")
    plt.xlabel("Occupation")
    plt.ylabel("Avg Height")
    plt.tight_layout()
    plt.savefig("query5_occupation_avgheight.png", dpi=100)
    plt.close()

    # Stop Spark
    spark.stop()
    print("Done! All queries executed, results shown, and plots saved.")

if __name__ == "__main__":
    main()