In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)
members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

result_df = (
    members_df
    .join(bookings_df, members_df.memid == bookings_df.memid)
    .filter((members_df.firstname == 'David') & (members_df.surname == 'Farrell'))
    .select(bookings_df.starttime)
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

facilities_df = spark.read.csv("path_to_cd_facilities.csv", header=True, inferSchema=True)
bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)

result_df = (
    facilities_df
    .join(bookings_df, facilities_df.facid == bookings_df.facid)
    .filter(
        (col("name").isin(['Tennis Court 2', 'Tennis Court 1'])) &
        (col("starttime") >= '2012-09-21') &
        (col("starttime") < '2012-09-22')
    )
    .select(col("starttime").alias("start"), col("name"))
    .orderBy(col("starttime"))
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

mems_df = members_df.alias("mems")
recs_df = members_df.alias("recs")

result_df = (
    mems_df
    .join(recs_df, col("recs.memid") == col("mems.recommendedby"), "left_outer")
    .select(
        col("mems.firstname").alias("memfname"),
        col("mems.surname").alias("memsname"),
        col("recs.firstname").alias("recfname"),
        col("recs.surname").alias("recsname")
    )
    .orderBy(col("memsname"), col("memfname"))
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)
bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)
facilities_df = spark.read.csv("path_to_cd_facilities.csv", header=True, inferSchema=True)

result_df = (
    members_df
    .join(bookings_df, members_df.memid == bookings_df.memid)
    .join(facilities_df, bookings_df.facid == facilities_df.facid)
    .filter(facilities_df.name.isin(['Tennis Court 2', 'Tennis Court 1']))
    .select(
        concat_ws(' ', col("mems.firstname"), col("mems.surname")).alias("member"),
        col("facs.name").alias("facility")
    )
    .distinct()
    .orderBy("member", "facility")
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

mems_df = members_df.alias("mems")
recs_df = members_df.alias("recs")

result_df = (
    mems_df
    .select(
        col("mems.firstname").alias("member"),
        (
            recs_df
            .filter(col("recs.memid") == col("mems.recommendedby"))
            .select(col("recs.firstname"), col("recs.surname"))
            .withColumn("recommender", col("recs.firstname") + " " + col("recs.surname"))
            .limit(1)
            .alias("recommender")
        )
    )
    .distinct()
    .orderBy("member")
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

result_df = (
    members_df
    .filter(col("recommendedby").isNotNull())
    .groupBy("recommendedby")
    .count()
    .orderBy("recommendedby")
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)

result_df = (
    bookings_df
    .groupBy("facid")
    .agg(sum("slots").alias("Total Slots"))
    .orderBy("facid")
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)

result_df = (
    bookings_df
    .filter((col("starttime") >= '2012-09-01') & (col("starttime") < '2012-10-01'))
    .groupBy("facid")
    .agg(sum("slots").alias("Total Slots"))
    .orderBy("Total Slots")
)

result_df.show()


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sum

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)

result_df = (
    bookings_df
    .filter(year(col("starttime")) == 2012)
    .groupBy("facid", month(col("starttime")).alias("month"))
    .agg(sum("slots").alias("Total Slots"))
    .orderBy("facid", "month")
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)

result_df = (
    bookings_df
    .select(countDistinct("memid").alias("Count of Distinct memid"))
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

bookings_df = spark.read.csv("path_to_cd_bookings.csv", header=True, inferSchema=True)
members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

bookings_df = bookings_df.withColumn("starttime", col("starttime").cast("timestamp"))

filtered_bookings_df = bookings_df.filter(col("starttime") >= '2012-09-01')

result_df = (
    filtered_bookings_df
    .join(members_df, on=["memid"], how="inner")
    .groupBy("surname", "firstname", "memid")
    .agg(min("starttime").alias("starttime"))
    .orderBy("memid")
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

result_df = (
    members_df
    .select(concat_ws(', ', col("surname"), col("firstname")).alias("name"))
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

facilities_df = spark.read.csv("path_to_cd_facilities.csv", header=True, inferSchema=True)

result_df = (
    facilities_df
    .filter(col("name").rlike("^TENNIS", caseSensitive=False))
)

result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Load the 'cd.members' table as a DataFrame
members_df = spark.read.csv("path_to_cd_members.csv", header=True, inferSchema=True)

# Perform the ETL operations
result_df = (
    members_df
    .filter(col("telephone").rlike('[()]'))
    .select("memid", "telephone")
)

# Show the result
result_df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, count
from pyspark.sql.window import Window
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

source_df = spark.read.csv("source_file.csv", header=True, inferSchema=True)

source_df = source_df.withColumn("letter", substring(source_df["surname"], 1, 1))

result_df = source_df.groupBy("letter").agg(count("*").alias("count"))

result_df = result_df.orderBy("letter")

result_df.show()

spark.stop()

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

sql_query = """
    SELECT DATE_ADD(to_date('2012-10-01'), seq) AS ts
    FROM (SELECT posexplode(sequence(to_date('2012-10-01'), to_date('2012-10-31'), interval 1 day)) AS (seq, ts)) temp
"""

result_df = spark.sql(sql_query)

result_df.show()

spark.stop()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_trunc, count
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("ETLExample").getOrCreate()

source_df = spark.read.csv("source_file.csv", header=True, inferSchema=True)

source_df = source_df.withColumn("month", date_trunc("month", source_df["starttime"]))

result_df = source_df.groupBy("month").agg(count("*").alias("count"))

result_df = result_df.orderBy("month")

result_df.show()

spark.stop()