In [1]:
# spark session

import os
import pyspark

conf = pyspark.SparkConf()
# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')

# conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory','4g')

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)
sc

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/14 17:35:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

<pyspark.sql.context.SQLContext at 0x7cf1056b2a70>

In [5]:
# in Python
BakeryData = spark\
    .read\
    .option("inferSchema","true")\
    .option("header","true")\
    .csv("hw2_data/Bakery.csv")

                                                                                

In [6]:
BakeryData

DataFrame[Date: date, Time: timestamp, Transaction: int, Item: string]

In [12]:
#Q1

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, count, date_format

# Filter the data for the specified time range (11AM to 1PM)
filtered_data = BakeryData.filter((hour(col("Time")) >= 11) & (hour(col("Time")) < 13))

# Group by item and date, and count the number of transactions
result = filtered_data.groupBy("Item", date_format(col("Date"), "yyyy-MM-dd").alias("day"))\
                      .agg(count("Transaction").alias("qty"))\
                      .orderBy("day", "Item")


result.show()

# result.write.csv("jwb10028_hw2_q1.csv", header=True)

                                                                                

+--------------------+----------+---+
|                Item|       day|qty|
+--------------------+----------+---+
|              Basket|2016-10-30|  1|
|               Bread|2016-10-30| 11|
|              Coffee|2016-10-30| 13|
|Ella's Kitchen Po...|2016-10-30|  1|
|            Frittata|2016-10-30|  2|
|               Fudge|2016-10-30|  1|
|   Hearty & Seasonal|2016-10-30|  1|
|       Hot chocolate|2016-10-30|  2|
|                 Jam|2016-10-30|  2|
|               Juice|2016-10-30|  2|
|           Medialuna|2016-10-30|  3|
|              Muffin|2016-10-30|  5|
|                NONE|2016-10-30|  3|
|        Scandinavian|2016-10-30|  8|
|                Soup|2016-10-30|  1|
|             Tartine|2016-10-30|  1|
|                 Tea|2016-10-30|  5|
|    Victorian Sponge|2016-10-30|  2|
|               Bread|2016-10-31|  6|
|                Cake|2016-10-31|  3|
+--------------------+----------+---+
only showing top 20 rows



                                                                                

In [19]:
#Q2

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, concat_ws, lit, date_format, dayofweek
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, collect_list

# Define the daypart based on time
bakery_data = BakeryData.withColumn("DayPart", 
                                     when((col("Time") >= "06:00:00") & (col("Time") < "12:00:00"), "Morning")
                                     .when((col("Time") >= "12:00:00") & (col("Time") < "18:00:00"), "Afternoon")
                                     .when((col("Time") >= "18:00:00") & (col("Time") < "00:00:00"), "Evening")
                                     .otherwise("Night"))

# Define the day type based on date
bakery_data = bakery_data.withColumn("DayOfWeek", dayofweek(col("Date")))\
                         .withColumn("DayType", 
                                     when(col("DayOfWeek").isin(1, 7), "Weekend")
                                     .otherwise("Weekday"))

grouped_data = bakery_data.groupBy("DayPart", "DayType", "Item")\
                          .agg(count("Transaction").alias("qty"))

# Window specification for ranking items within each DayPart and DayType
windowSpec = Window.partitionBy("DayPart", "DayType").orderBy(col("qty").desc())

# Rank the items and filter for the top 3
ranked_data = grouped_data.withColumn("rank", row_number().over(windowSpec))\
                          .filter(col("rank") <= 3)

# Collect the top 3 items into a single column
result = ranked_data.groupBy("DayPart", "DayType")\
                    .agg(collect_list("Item").alias("TopItems"))

# Convert the list of top items into a single string column
result = result.withColumn("Top3Items", concat_ws(", ", col("TopItems")))

# Select and show the final result
final_result = result.select("DayPart", "Top3Items", "DayType")
final_result.show(truncate=False)

# final_result.write.csv("hw2_results/hw2_q2.csv", header=True)

                                                                                

+---------+------------------------------------------+-------+
|DayPart  |Top3Items                                 |DayType|
+---------+------------------------------------------+-------+
|Afternoon|Coffee, Bread, Tea                        |Weekday|
|Afternoon|Coffee, Bread, Tea                        |Weekend|
|Morning  |Coffee, Bread, Pastry                     |Weekday|
|Morning  |Coffee, Bread, Pastry                     |Weekend|
|Night    |Coffee, Bread, Fudge                      |Weekday|
|Night    |Tshirt, Afternoon with the baker, Postcard|Weekend|
+---------+------------------------------------------+-------+



                                                                                

In [8]:
#Q3
RestaurantData = spark\
    .read\
    .option("inferSchema","true")\
    .option("header","true")\
    .json("hw2_data/Restaurants_in_Durham_County_NC.json")

RestaurantData

                                                                                

DataFrame[datasetid: string, fields: struct<closing_date:string,est_group_desc:string,geolocation:array<double>,hours_of_operation:string,id:string,insp_freq:bigint,opening_date:string,premise_address1:string,premise_address2:string,premise_city:string,premise_name:string,premise_phone:string,premise_state:string,premise_zip:string,risk:bigint,rpt_area_desc:string,seats:bigint,sewage:string,smoking_allowed:string,status:string,transitional_type_desc:string,type_description:string,water:string>, geometry: struct<coordinates:array<double>,type:string>, record_timestamp: string, recordid: string]

In [10]:
RestaurantData.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-- sewage: string (nullable = true)
 |   

In [13]:
# Corrected code based on the actual column name
entity_counts = RestaurantData.groupBy("fields.rpt_area_desc").count()
entity_counts.show()
entity_counts.write.csv("hw2_results/hw2_q3.csv", header=True)

                                                                                

+--------------------+-----+
|       rpt_area_desc|count|
+--------------------+-----+
|  Bed&Breakfast Home|    4|
|        Summer Camps|    4|
|        Institutions|   30|
|   Local Confinement|    2|
|         Mobile Food|  147|
|    School Buildings|   89|
|         Summer Food|  242|
|      Swimming Pools|  420|
|            Day Care|  173|
|Tattoo Establishm...|   36|
|    Residential Care|  154|
|   Bed&Breakfast Inn|    2|
|      Adult Day Care|    5|
|             Lodging|   62|
|        Food Service| 1093|
+--------------------+-----+



                                                                                

In [29]:
#Q4
# Define the schema explicitly
schema = "Country STRING, `1980` DOUBLE, `1990` DOUBLE, `2000` DOUBLE, `2010` DOUBLE"

PopulationData = spark\
    .read\
    .schema(schema)\
    .csv("hw2_data/populationbycountry19802010millions.csv")

PopulationData.printSchema()
PopulationData.show(200)

root
 |-- Country: string (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1990: double (nullable = true)
 |-- 2000: double (nullable = true)
 |-- 2010: double (nullable = true)

+--------------------+----------+----------+----------+----------+
|             Country|      1980|      1990|      2000|      2010|
+--------------------+----------+----------+----------+----------+
|                NULL|    1980.0|    1981.0|    1982.0|    1983.0|
|       North America| 320.27638| 324.44694| 328.62014| 332.72487|
|             Bermuda|   0.05473|   0.05491|   0.05517|   0.05551|
|              Canada|   24.5933|      24.9|   25.2019|   25.4563|
|           Greenland|   0.05021|   0.05103|   0.05166|   0.05211|
|              Mexico|  68.34748|  69.96926|   71.6409|  73.36288|
|Saint Pierre and ...|   0.00599|   0.00601|   0.00605|   0.00607|
|       United States| 227.22468| 229.46571| 231.66446| 233.79199|
|Central & South A...| 293.05856| 299.43033| 305.95253| 312.51136|
|      

In [30]:
from pyspark.sql.functions import col, expr

# Filter out non-country rows (assuming non-country rows have 'World' or 'Region' in the Country column)
PopulationData = PopulationData.filter(~col("Country").rlike("World|Region"))

# Calculate the percentage increase between 1990 and 2000
PopulationData = PopulationData.withColumn("percentage_increase", 
                                           ((col("2000") - col("1990")) / col("1990")) * 100)

# Select the required columns and sort by the percentage increase
top_5_countries = PopulationData.select("Country", "percentage_increase")\
                                .orderBy(col("percentage_increase").desc())\
                                .limit(5)

# Show the results
top_5_countries.show()
top_5_countries.write.csv("hw2_results/hw2_q4.csv", header=True)

+--------------------+-------------------+
|             Country|percentage_increase|
+--------------------+-------------------+
|      Western Sahara| 11.115105327485802|
|United Arab Emirates|  6.340911640429277|
|        Saudi Arabia|  5.949170947157791|
|                Oman|  5.129210650575041|
|   Equatorial Guinea|  5.040841857384645|
+--------------------+-------------------+



In [4]:
#Q5
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, lower, col, split, explode, count

# Initialize Spark session
spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Load the text files from the directory
text_files = spark.read.text("hw2_data/hw1text/*.txt")

# Show the initial data (optional)
text_files.show(truncate=False)


24/07/14 14:35:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
# Normalize to lower case and replace characters not in [0-9a-z] with a space
cleaned_text = text_files.withColumn("value", lower(col("value")))
cleaned_text = cleaned_text.withColumn("value", regexp_replace(col("value"), "[^0-9a-z]", " "))

words = cleaned_text.withColumn("word", explode(split(col("value"), "\s+")))
# Filter out empty words
words = words.filter(words.word != "")

In [7]:
# Perform word count
word_count = words.groupBy("word").agg(count("word").alias("count")).orderBy("count", ascending=False)

# Show the result
word_count.show(truncate=False)
word_count.write.csv("hw2_results/hw2_q5.csv", header=True)

                                                                                

+----+------+
|word|count |
+----+------+
|the |163547|
|to  |89046 |
|p   |78664 |
|of  |75568 |
|and |72730 |
|in  |56782 |
|a   |53198 |
|for |29770 |
|that|28852 |
|is  |27601 |
|on  |24485 |
|s   |23615 |
|with|19575 |
|are |19417 |
|it  |18231 |
|be  |17998 |
|as  |17796 |
|have|16188 |
|at  |15965 |
|we  |15754 |
+----+------+
only showing top 20 rows



                                                                                

In [2]:
#Q6
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, lower, col, split, concat_ws, lag, explode, lit, size, expr
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("BigramCount").getOrCreate()

# Load the text files from the directory
text_files = spark.read.text("hw2_data/hw1text/*.txt")

text_files.show(truncate=False)

24/07/14 17:36:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
# Process text: clean and tokenize
cleaned_text = text_files.select(regexp_replace(lower(col("value")), "[^a-zA-Z\\s]", "").alias("cleaned_text"))
words = cleaned_text.select(split(col("cleaned_text"), "\s+").alias("words"))

words.show()

+--------------------+
|               words|
+--------------------+
|                  []|
|[, h, covid, chil...|
|[, p, economynext...|
|[, p, the, colomb...|
|[, h, over, of, u...|
|[, h, the, virus,...|
|[, h, dozens, of,...|
|[, p, by, now, yo...|
|[, h, share, this...|
|[, h, with, muchn...|
|[, p, staffers, o...|
|[, h, is, change,...|
|[, h, for, the, c...|
|[, p, nursing, ho...|
|[, p, the, new, c...|
|[, p, saudi, arab...|
|[, h, government,...|
|[, h, new, york, ...|
|[, h, john, ball,...|
|[, h, from, battl...|
+--------------------+
only showing top 20 rows



In [10]:
# Filter out empty strings and explode to create individual words
filtered_words = words.select(explode(expr("filter(words, x -> x != '')")).alias("word"))

filtered_words.show()

[Stage 11:>                                                         (0 + 1) / 1]

+---------------+
|           word|
+---------------+
|              h|
|          covid|
|       children|
|        grabbed|
|             in|
|       takoradi|
|            for|
|        hawking|
|              p|
|            the|
|          joint|
|       security|
|            and|
|sekonditakoradi|
|   metropolitan|
|       assembly|
|           stma|
|     monitoring|
|           team|
|           have|
+---------------+
only showing top 20 rows



                                                                                

In [12]:
from pyspark.sql.functions import lead

# Create bigrams
bigrams = filtered_words.withColumn("next_word", lead(col("word"), 1).over(Window.orderBy(lit(1)))) \
                        .filter(col("next_word").isNotNull()) \
                        .select(concat_ws(" ", col("word"), col("next_word")).alias("bigram"))

bigrams.show()

24/07/14 17:46:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:46:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:46:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:46:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:46:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+
|              bigram|
+--------------------+
|             h covid|
|      covid children|
|    children grabbed|
|          grabbed in|
|         in takoradi|
|        takoradi for|
|         for hawking|
|           hawking p|
|               p the|
|           the joint|
|      joint security|
|        security and|
| and sekonditakoradi|
|sekonditakoradi m...|
|metropolitan asse...|
|       assembly stma|
|     stma monitoring|
|     monitoring team|
|           team have|
|       have arrested|
+--------------------+
only showing top 20 rows



                                                                                

In [14]:
# Count occurrences of each bigram
bigram_counts = bigrams.groupBy("bigram").count()

# Get the 6 most common bigrams
most_common_bigrams = bigram_counts.orderBy(col("count").desc()).limit(6)

# Show the results
most_common_bigrams.show(truncate=False)
most_common_bigrams.write.csv("hw2_results/hw2_q6.csv", header=True)

24/07/14 17:49:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:49:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:49:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:49:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+-------+-----+
|bigram |count|
+-------+-----+
|of the |17509|
|in the |12931|
|p the  |10409|
|to the |8458 |
|for the|5595 |
|on the |5025 |
+-------+-----+



24/07/14 17:49:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:49:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:49:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/14 17:49:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                