In [None]:
# prelim setup - mount drive
from google.colab import drive
drive.mount("/content/drive")

In [None]:
# install pyspark
!pip install pyspark

In [3]:
# import statements
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import matplotlib.pyplot as plt

# starting spark session
spark = SparkSession.builder \
    .appName("Dataset Analysis") \
    .getOrCreate()

In [4]:
# load data + transform into PySpark DataFrame
dataset_path = "data/coffee_dataset.csv"
df = spark.read.csv(dataset_path, header=True, inferSchema=True)

# EXPLORATORY DATA ANALYSIS (EDA)

In [None]:
# exploratory data analysis - getting to know schema, format, type of data, missing values, etc.
df.printSchema()
df.head(10)
df.tail(10)

Explorating dataset by plotting distribution of ages ranges that voted. Firstly, cleaning the data and then ensuring that there are proper values as the string value in the column.

In [None]:
# clean data
cleaned_df = df.na.drop(subset=["age"])
cleaned_df = cleaned_df.filter(cleaned_df["age"] != "NA")
cleaned_df = cleaned_df.filter(cleaned_df["age"].contains("years old"))

# grouping and seeing count of each age group
age_group_count = cleaned_df.groupBy("age").count()
age_group_count = age_group_count.orderBy(age_group_count["count"].desc())
age_group_count.show()

# producing histogram to show distribution of age groups
age_group_df = age_group_count.toPandas()
plt.figure()
plt.bar(age_group_df["age"], age_group_df["count"])
plt.xlabel("Age group")
plt.ylabel("Count")
plt.title("Histogram of Count per Age Group")
plt.xticks(rotation=45)
plt.show()

Results show the voting distrubtion by age is skewed to the left (if considering numerical range, not strings) because the two highest groups that voted the most include 25-34 years old and 35-44 years old.

It is possible to get an overall idea of the general distribution / image of the dataset. For example, seeing the histogram distribution of votes for 'favorite' column.

In [None]:
# clean data
cleaned_df = df.na.drop(subset=["favorite"])
cleaned_df = cleaned_df.filter(cleaned_df["favorite"] != "NA")

# grouping and seeing count of favorite
favorite_count = cleaned_df.groupBy("favorite").count()
favorite_count = favorite_count.orderBy(favorite_count["count"].desc())
favorite_count.show()

# producing histogram to show distribution of favorite types of coffee
favorite_count_df = favorite_count.toPandas()
plt.figure()
plt.bar(favorite_count_df["favorite"], favorite_count_df["count"])
plt.xlabel("Favorite coffee type")
plt.ylabel("Count")
plt.title("Histogram of Count per Coffee Type")
plt.xticks(rotation=45)
plt.show()

Analysis shows that Pourover is by far the most voted for coffee favorite.

# SPECIFIC ANALYSIS

Exploring relationship between age group and brew form preference


In [None]:
# filtering out NULL or NA values from age column
filtered_age = df.filter(df["age"].isNotNull())
filtered_age = filtered_age.filter(filtered_age["age"] != "NA")
filtered_age.show(10)

# filtering out NULL or NA values from brew column
filtered_age_brew = filtered_age.filter(filtered_age["brew"].isNotNull())
filtered_age_brew = filtered_age_brew.filter(filtered_age_brew["brew"] != "NA")
filtered_age_brew.show(10)

# cleaned dataset
filtered_df = filtered_age_brew

In [None]:
# group by method to show general variation of ages - also acts as sanity check to ensure no invalid values got through
grouped_age = filtered_df.groupBy("age").count()
grouped_age.show(10)

# since invalid string values were displayed, need to filtered further by values that contain "years old"
filtered_df = filtered_df.filter(filtered_df["age"].contains("years old"))
grouped_age = filtered_df.groupBy("age").count()
grouped_age.show()

INSIGHT: displayed results show that greatest amount of surveyed are in the 25-34 year old pool

In [None]:
# create a SQL table to perform SQL statements utilizing spark.sql queries
filtered_df.createOrReplaceTempView("coffee_table")

# Writing a SQL query to give each age group as well as the distribution of brew method
sql_query = """
SELECT age, brew, COUNT(*) AS count
FROM coffee_table
GROUP BY age, brew
ORDER BY age, count DESC
"""

# executing the SQL query utilizing spark.sql
sql_df_result = spark.sql(sql_query)

# first row of each age type showing the highest count so dropping duplicates
sql_df_result = sql_df_result.dropDuplicates(["age"])

# displaying result
sql_df_result.show()

Results show that ages from less than or equal to 54 years old have the highest count for 'Pour over' brew type

In [None]:
# plotting to show which age group has most popular vote for "Pour over" excluding Ages 55-64 / > 65 years for which it was not most popular
sql_df_result = sql_df_result.filter(sql_df_result["age"] != ">65 years old")
sql_df_result = sql_df_result.filter(sql_df_result["age"] != "55-64 years old")
pd_df = sql_df_result.toPandas()
plt.figure()
plt.title("Counts of 'Pour over' most popular type by Age Group")
plt.xlabel("Age")
plt.ylabel("Count of 'Pour over' votes")
plt.bar(pd_df["age"], pd_df["count"], color="red")
plt.xticks(rotation=45)
plt.show()

The plot shows that 25-34 years old group had the greatest amount of counts for 'Pour Over'. Further investigation is needed as to whether it might be due to that group having the highest amount of votes in the survey, or if they disproportionately vote for the 'Pour Over' method.

Exploring Relationship between Age Group and Roast Level

In [None]:
# filtering out NULL or NA values from age column
filtered_age = df.filter(df["age"].isNotNull())
filtered_age = filtered_age.filter(filtered_age["age"] != "NA")

# filtering out NULL or NA values from roast_level column
filtered_age_rl = filtered_age.filter(filtered_age["roast_level"].isNotNull())
filtered_age_rl = filtered_age_rl.filter(filtered_age_rl["roast_level"] != "NA")
filtered_age_rl.show(10)

# cleaned dataset
filtered_df = filtered_age_rl

In [None]:
# need to filtered age strings further by values that contain "years old"
filtered_df = filtered_df.filter(filtered_df["age"].contains("years old"))
grouped_age = filtered_df.groupBy("age").count()
grouped_age.show()

# group by method to show general variation of roast level - now sorting by count desc
grouped_rl = filtered_df.groupBy("roast_level").count().orderBy(col("count").desc())
grouped_rl.show()

Did not see any unexpected string output for the roast level even when sorting by desc and asc


In [None]:
# create a SQL table to perform SQL statements utilizing spark.sql queries
filtered_df.createOrReplaceTempView("coffee_table")

# Writing a SQL query to give each age group as well as the distribution of roast_level method
sql_query = """
SELECT age, roast_level, COUNT(*) AS count
FROM coffee_table
GROUP BY age, roast_level
ORDER BY age, count DESC
"""

# executing the SQL query utilizing spark.sql
sql_df_result = spark.sql(sql_query)

# first row of each age type showing the highest count so dropping duplicates
sql_df_result = sql_df_result.dropDuplicates(["age"])

# displaying result
sql_df_result.show()

The most popular roast levels across all age groups were between Light and Medium. Ages less than or equal to 44 had Light roast level as the highest voted and ages between 45 and over preferred Medium roast level.

In [None]:
# plotting to show age range count across groups that voted Light roast as most preferred the most
sql_df_result_light = sql_df_result.filter(sql_df_result["age"] != ">65 years old")
sql_df_result_light = sql_df_result_light.filter(sql_df_result_light["age"] != "55-64 years old")
sql_df_result_light = sql_df_result_light.filter(sql_df_result_light["age"] != "45-54 years old")
pd_df = sql_df_result_light.toPandas()
plt.figure()
plt.title("Counts of 'Light Roast' most popular type by Age Group")
plt.xlabel("Age")
plt.ylabel("Count of 'Light Roast' votes")
plt.bar(pd_df["age"], pd_df["count"], color="green")
plt.xticks(rotation=45)
plt.show()

Again, the 25-34 year old age group range dominated. Again, it is not certain as to whether this group prefers light roast more so than other groups or if it is due to their increased number of votes given in general (more participation).

In [None]:
# plotting to show age range count across groups that voted Medium roast as most preferred the most
sql_df_result_medium = sql_df_result.filter(sql_df_result["age"] != "18-24 years old")
sql_df_result_medium = sql_df_result_medium.filter(sql_df_result_medium["age"] != "25-34 years old")
sql_df_result_medium = sql_df_result_medium.filter(sql_df_result_medium["age"] != "35-44 years old")
sql_df_result_medium = sql_df_result_medium.filter(sql_df_result_medium["age"] != "<18 years old")
pd_df = sql_df_result_medium.toPandas()
plt.figure()
plt.title("Counts of 'Medium Roast' most popular type by Age Group")
plt.xlabel("Age")
plt.ylabel("Count of 'Medium Roast' votes")
plt.bar(pd_df["age"], pd_df["count"], color="brown")
plt.xticks(rotation=45)
plt.show()

The 45-54 year old group range had the most votes for Medium roast out of the groups that voted the most for Medium roast. This group was the third highest group by count that voted so that may also have an impact on these results (skewed potentially).

Exploring if there is a correlation between age and the perceived level of expertise. In this section, the string ages ranges will be transformed into the median of the range group to get a numerical value for an age. The exact mapping is not particularly rule based, it is just to explore and see if a potential relationship may exist and then further going towards that if so. Since less than 18 and greater than 65 are outliers, they will be excluded.

In [None]:
# clean data (age)
cleaned_df = df.na.drop(subset=["age"])
cleaned_df = cleaned_df.filter(cleaned_df["age"] != "NA")
cleaned_df = cleaned_df.filter(cleaned_df["age"].contains("years old"))
cleaned_df = cleaned_df.filter(cleaned_df["age"] != ">65 years old")
cleaned_df = cleaned_df.filter(cleaned_df["age"] != "<18 years old")

# clean data (expertise)
cleaned_df = cleaned_df.na.drop(subset=["expertise"])
cleaned_df = cleaned_df.filter(cleaned_df["expertise"] != "NA")
cleaned_df = cleaned_df.filter(col("expertise").cast("double").between(0, 10))

# transforming data
str_to_num = {
    "18-24 years old": 21,
    "25-34 years old": 29.5,
    "35-44 years old": 39.5,
    "45-54 years old": 49.5,
    "55-64 years old": 59.5
}

# apply mapping using when() function
for old_value, new_value in str_to_num.items():
    cleaned_df = cleaned_df.withColumn("age", when(col("age") == old_value, new_value).otherwise(col("age")))

final_df = cleaned_df.select("age", "expertise")
final_df.show()

for col_name in ["age", "expertise"]:
    final_df = final_df.withColumn(col_name, col(col_name).cast("double"))

final_df_pandas = final_df.toPandas()

# scatter plot
plt.figure()
plt.scatter(final_df_pandas["age"], final_df_pandas["expertise"])
plt.xlabel("Age")
plt.ylabel("Expertise")
plt.title("Age vs Expertise")
plt.show()

# calculate correlation coefficient
correlation_coefficient = final_df.stat.corr("age", "expertise")
print(correlation_coefficient)

Correlation coefficient was very low, showing that there is likely not a strong correlation between age group and expertise noted. Further investigation may be required to explore this relationship more.

# ETL with PySpark to form recommendation

There is also another dataset that contains the names of the best Coffee shop locations for many types of coffee (expresso, cappuccino, etc.) and it also contains it for the roast levels. It would then be possible to utilize and ETL process to extract data from this dataset, transform it along with the prior coffee voting dataset, in order to produce a new dataset that contains age groups, their preferred coffee, and the recommended coffee shop and then load it into a new dataset file, essentionally make a sort of recommendation of which coffee shop to visit based on preferrences.

In [None]:
# extract coffee shop dataset
coffee_shop_dataset_path = "data/coffee_shops.csv"
df_shops = spark.read.csv(coffee_shop_dataset_path, header=True, inferSchema=True)
df_shops.show()

In [None]:
# transform dataset in order to produce new dataset for coffee shop recommendations based on roast preferences
df_shops = df_shops.withColumnRenamed("Known for", "roast_level")
df_shops = df_shops.withColumnRenamed("Name of shop", "recommended_shop")

transformed_df = sql_df_result.join(df_shops, on="roast_level")
transformed_df = transformed_df.select(col("age"), col("roast_level"), col("recommended_shop"))
transformed_df.show()

# OPTIONAL - CAN ALSO BE DONE WITH SQL
sql_df_result.createOrReplaceTempView("AgeRoast_table")
df_shops.createOrReplaceTempView("Shops_table")
sql_query = """
SELECT AgeRoast_table.age, AgeRoast_table.roast_level, Shops_table.recommended_shop
FROM AgeRoast_table
JOIN Shops_table
ON AgeRoast_table.roast_level = Shops_table.roast_level
"""
transformed_df = spark.sql(sql_query)
transformed_df.show()

In [None]:
# load new dataset into new datafile
transformed_df = transformed_df.toPandas()
transformed_df.to_csv("recommended_coffee_shops.csv")

In [None]:
# end spark session
spark.stop()