In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETLExample").getOrCreate()


bookings_df = spark.table("bookings")
facilities_df = spark.table("facilities")

#another way of doing it without tables.
# bookings_df = spark.read.csv("/FileStore/tables/bookings.csv", header=True, inferSchema=True)
# members_df = spark.read.csv("/FileStore/tables/members.csv", header=True, inferSchema=True)

result_df = (
    members_df
    .join(bookings_df, members_df.memid == bookings_df.memid)
    .filter((members_df.firstname == 'David') & (members_df.surname == 'Farrell'))
    .select(bookings_df.starttime)
)

result_df.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the bookings and facilities tables into DataFrames
bookings_df = spark.table("bookings")
facilities_df = spark.table("facilities")

# Perform the join and filter operations
result_df = (
    bookings_df
    .join(facilities_df, bookings_df.facid == facilities_df.facid)
    .filter((col("starttime").cast("date") == '2012-09-21') & (col("name").like("%Tennis%")))
    .select("starttime", "name")
    .orderBy("starttime")
)

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members table into a DataFrame
members_df = spark.table("members")

# Self-join to include the recommended by information
result_df = (
    members_df.alias("m1")
    .join(members_df.alias("m2"), col("m1.recommendedby") == col("m2.memid"), "left_outer")
    .select(
        col("m1.memid"),
        col("m1.surname").alias("member_surname"),
        col("m1.firstname").alias("member_firstname"),
        col("m1.recommendedby"),
        col("m2.surname").alias("recommendedby_surname"),
        col("m2.firstname").alias("recommendedby_firstname")
    )
    .orderBy("member_surname", "member_firstname")
)

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the bookings, facilities, and members tables into DataFrames
bookings_df = spark.table("bookings")
facilities_df = spark.table("facilities")
members_df = spark.table("members")

# Perform the joins and filter operations
result_df = (
    bookings_df
    .join(facilities_df, bookings_df.facid == facilities_df.facid)
    .join(members_df, bookings_df.memid == members_df.memid)
    .filter(col("name").like("%Tennis%"))
    .select(
        concat_ws(' ', col("firstname"), col("surname")).alias("member_name"),
        col("name").alias("facility_name")
    )
    .distinct()  # Ensure no duplicate data
    .orderBy("member_name", "facility_name")
)

# Show the result
result_df.show(truncate=False)

In [None]:
# having some issue here. q5

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, when

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members table into a DataFrame
members_df = spark.table("members")

# Self-join to include the recommended by information
result_df = (
    members_df.alias("m1")
    .join(members_df.alias("m2"), col("m1.recommendedby") == col("m2.memid"), "left_outer")
    .select(
        col("m1.memid"),
        col("m1.firstname").alias("member_firstname"),
        col("m1.surname").alias("member_surname"),
        col("m1.recommendedby"),
        col("m2.firstname").alias("recommendedby_firstname"),
        col("m2.surname").alias("recommendedby_surname")
    )
    .distinct()  # Ensure no duplicate data
    .orderBy("member_firstname", "member_surname")
)

# Concatenate member and recommended by names into a single column
result_df = result_df.withColumn(
    "member_and_recommendedby",
    when(col("recommendedby").isNotNull(),
         concat_ws(' ', col("recommendedby_firstname"), col("recommendedby_surname"))
        )
        .otherwise(concat_ws(' ', col("member_firstname"), col("member_surname")))
)

# Select relevant columns for the final output
result_df = result_df.select("member_and_recommendedby")

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members table into a DataFrame
members_df = spark.table("members")

# Group by recommendedby and count the number of recommendations
recommendations_count_df = (
    members_df
    .groupBy("recommendedby")
    .agg(count("*").alias("recommendations_count"))
    .orderBy("recommendedby")
)

# Show the result
recommendations_count_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the bookings table into a DataFrame
bookings_df = spark.table("bookings")

# Group by facility id and sum the number of slots booked
facility_slots_df = (
    bookings_df
    .groupBy("facid")
    .agg(sum("slots").alias("total_slots"))
    .orderBy("facid")
)

# Show the result
facility_slots_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, month, year

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the bookings table into a DataFrame
bookings_df = spark.table("bookings")

# Filter bookings for the month of September 2012
september_bookings_df = bookings_df.filter((month("starttime") == 9) & (year("starttime") == 2012))

# Group by facility id and sum the number of slots booked
facility_slots_df = (
    september_bookings_df
    .groupBy("facid")
    .agg(sum("slots").alias("total_slots"))
    .orderBy(col("total_slots").desc())
)

# Show the result
facility_slots_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, month, year

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the bookings table into a DataFrame
bookings_df = spark.table("bookings")

# Filter bookings for the year 2012
year_2012_bookings_df = bookings_df.filter(year("starttime") == 2012)

# Group by facility id and month, then sum the number of slots booked
facility_slots_per_month_df = (
    year_2012_bookings_df
    .groupBy("facid", month("starttime").alias("month"))
    .agg(sum("slots").alias("total_slots"))
    .orderBy("facid", "month")
)

# Show the result
facility_slots_per_month_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the bookings table into a DataFrame
bookings_df = spark.table("bookings")

# Count the distinct memids (members and guests) who made at least one booking
total_members_with_bookings = bookings_df.select(countDistinct("memid").alias("total_members"))

# Show the result
total_members_with_bookings.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members and bookings tables into DataFrames
members_df = spark.table("members")
bookings_df = spark.table("bookings")

# Filter bookings after September 1st, 2012, and find the minimum booking time for each member
min_booking_time_df = (
    bookings_df
    .filter(col("starttime") > "2012-09-01")
    .groupBy("memid")
    .agg(min("starttime").alias("first_booking_after_sep1"))
)

# Join with members table to get member details
result_df = (
    members_df
    .join(min_booking_time_df, "memid", "left_outer")
    .select(
        "memid",
        col("firstname").alias("member_firstname"),
        col("surname").alias("member_lastname"),
        "first_booking_after_sep1"
    )
    .orderBy("memid")
)

# Show the result
result_df.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members table into a DataFrame
members_df = spark.table("members")

# Select and format the names
result_df = (
    members_df
    .select(
        concat_ws(', ', col("surname"), col("firstname")).alias("formatted_name")
    )
    .orderBy("formatted_name")
)

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the facilities table into a DataFrame
facilities_df = spark.table("facilities")

# Perform a case-insensitive search
result_df = (
    facilities_df
    .filter(col("name").ilike("tennis%"))
)

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members table into a DataFrame
members_df = spark.table("members")

# Find telephone numbers with parentheses
result_df = (
    members_df
    .filter(col("telephone").contains("(") | col("telephone").contains(")"))
    .select("memid", "telephone")
    .orderBy("memid")
)

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, count, col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Read the members table into a DataFrame
members_df = spark.table("members")

# Extract the first letter of each surname
members_df = members_df.withColumn("surname_first_letter", substring("surname", 1, 1))

# Group by the first letter of the surname and count the occurrences
result_df = (
    members_df
    .groupBy("surname_first_letter")
    .agg(count("*").alias("count"))
    .orderBy("surname_first_letter")
)

# Show the result
result_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Create a DataFrame with a single column containing all the dates in October 2012
dates_df = spark.range("2012-10-01", "2012-10-31", 1).select(col("id").cast("timestamp").alias("date"))

# Show the result
dates_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month

# Create a Spark session
spark = SparkSession.builder.appName("ETLExample").getOrCreate()

# Assuming you have a DataFrame named bookings_df with a timestamp column named 'starttime'
# Replace 'bookings_df' with your actual DataFrame name and adjust the column names accordingly

# Extract year and month from the 'starttime' column
bookings_df = bookings_df.withColumn("year", year("starttime")).withColumn("month", month("starttime"))

# Group by year and month, and count the number of bookings
monthly_bookings_count = bookings_df.groupBy("year", "month").count().sort("year", "month")

# Show the result
monthly_bookings_count.show(truncate=False)