# Mini-project: Spark

This notebook demonstrates hands-on experience with Apache Spark for big data analytics. It was created as part of my Big Data class and serves as my first hands-on project using Spark. Two real-world datasets are analyzed:
- Techcrunch blog posts (2010)
- Car sales transactions

We use both DataFrame SQL and RDD APIs to answer practical analytics questions.


## Part 1: Techcrunch posts

We are using the `techcrunch_posts.csv` input file. It is a Comma Separated Values (CSV) file that contains blog posts that have been crawled from the Techcrunch blog community at 2010. Each row represents a blog post and has the following columns:

* `postId`: the ID of the post record in Techcrunch.
* `title`: the title of the blog post.
* `author`: the name of the blogger who authored the post.
* `authorId`: the ID of the blogger who authored the post.
* `comments`: the number of comments of the blog post.
* `content`: the full text of the blog post.
* `url`: the URL of the blog post.
* `date`: the publication date of the blog post.
* `inlinks`: the number of incoming links to this blogpost.

Dataset source: [https://www.kaggle.com/datasets/lakritidis/identifying-influential-bloggers-techcrunch](https://www.kaggle.com/datasets/lakritidis/identifying-influential-bloggers-techcrunch)


In [None]:
# We read the input file here
import pyspark
from pyspark.sql import SparkSession

sc = SparkSession \
        .builder \
        .master('local[4]') \
        .appName("Assignment2") \
        .getOrCreate()

print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__)

df = sc.read.option('delimiter', ',').option('header', 'true')\
    .csv("file://///home/hadoop/Documents/techcrunch_posts.csv")

# Always a good idea to take a peek
df.printSchema()
df.show(5)

25/05/27 17:46:15 WARN Utils: Your hostname, BDCC resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/05/27 17:46:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/27 17:46:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 3.5.0
PySpark Version: 3.5.0


### Most active dates
We find the most active dates in number of submitted posts. Return a list with the publication dates ordered in reverse order of number of submitted posts.



In [None]:
df.createOrReplaceTempView("blog")

df_sql_1 = sc.sql("""
    select date, count(*) as num_posts 
    from blog 
    group by date 
    order by num_posts desc
    """)

df_sql_1.show(df_sql_1.count())

+--------------------+---------+
|                date|num_posts|
+--------------------+---------+
|          2009-09-14|       51|
|          2009-09-15|       49|
|          2009-10-13|       42|
|          2009-09-09|       42|
|          2009-06-24|       42|
|          2009-11-18|       40|
|          2010-02-11|       40|
|          2009-12-09|       40|
|          2010-02-16|       39|
|          2010-01-20|       38|
|          2009-09-10|       38|
|          2010-02-23|       37|
|          2009-12-15|       37|
|          2009-10-21|       37|
|          2009-11-10|       37|
|          2009-10-20|       37|
|          2009-10-19|       37|
|          2009-10-15|       37|
|          2009-09-22|       37|
|          2010-01-06|       37|
|          2009-07-07|       37|
|          2009-11-03|       36|
|          2009-07-23|       36|
|          2010-03-10|       35|
|          2010-01-13|       35|
|          2009-12-07|       35|
|          2010-02-24|       35|
|         

### Most cited authors

We create a list with the authors ordered in reverse order of citations.



In [None]:
df_sql_2 = sc.sql("""
    select author, sum(inlinks) as citations 
    from blog 
    group by author 
    order by citations desc
    """)

df_sql_2.show(df_sql_2.count())

+--------------------+---------+
|              author|citations|
+--------------------+---------+
|   Michael Arrington|  86919.0|
|     Erick Schonfeld|  28430.0|
|          MG Siegler|  15208.0|
|       Jason Kincaid|  14409.0|
|        Duncan Riley|  11626.0|
|       Robin Wauters|   9479.0|
|           Leena Rao|   6520.0|
|    Mark Hendrickson|   5371.0|
|Marshall Kirkpatrick|   3935.0|
|          John Biggs|   3347.0|
|       Nick Gonzalez|   3255.0|
|        Guest Author|   2307.0|
|       Greg Kumparak|   2245.0|
|        Mike Butcher|   1930.0|
|              tcnikc|   1471.0|
|          Sarah Lacy|   1089.0|
|       Don Reisinger|    918.0|
|      Devin Coldewey|    884.0|
|          Matt Burns|    843.0|
|       Steve Gillmor|    834.0|
|         Serkan Toto|    817.0|
|    Natali Del Conte|    551.0|
|            Peter Ha|    516.0|
|         Brian Solis|    485.0|
|       Ouriel Ohayon|    483.0|
|          Roi Carthy|    441.0|
|           Paul Carr|    427.0|
|     Nich

### Average length of posts

We compute the average length (in words) of the blog posts per author. We return a list with the authors and the average post length (in words).

We sort the output in decreasing post length order. The top record shows the author that on average, publishes the longer posts.



In [None]:
rdd = df.rdd # I did this with RDD just for fun

# First emit word count and post count per author
def map_content(row):
    author = row['author']
    content = row['content']
    if content: # let's avoid surprises
        wordcount = len(content.split())
    else:
        wordcount = 0
    return author, (wordcount, 1)    
    
authors_and_counts_rdd = rdd.map(map_content)

# Then aggregate by adding word counts and post counts per author
authors_counts_reduced_rdd = authors_and_counts_rdd.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

# Then calculate averages per author
def calculate_avg(pair):
    author = pair[0]
    total_wordcount, total_posts = pair[1]
    average = total_wordcount / total_posts
    return author, average

authors_and_avg_rdd = authors_counts_reduced_rdd.map(calculate_avg)

# Finally reverse the RDD to sort by value and reverse back to print
rdd_rev = authors_and_avg_rdd.map(lambda x: ( x[1], x[0] )) 
rdd_rev_sort = rdd_rev.sortByKey(False)
rdd_sort = rdd_rev_sort.map(lambda x: ( x[1], x[0] )) 
rdd_sort.collect()

                                                                                

[('TechCrunch Europe', 2455.0),
 ('Orli Yakuel', 1898.8333333333333),
 ('Brian Solis', 1532.4117647058824),
 ('Eric Clemons', 1481.0),
 ('Susan Wu', 1453.0),
 ('Paul Carr', 1321.267857142857),
 ('Ryan Carson', 1289.0),
 ('Thomas Scott McKenzie', 1223.0),
 ('Guest Author', 1199.26582278481),
 ('Vivek Wadhwa', 1147.741935483871),
 ('Gregor Hochmuth', 1049.0),
 ('Muhammad Saleem', 1044.0),
 ('David Sacks', 1002.0),
 ('LondonVC', 937.5),
 ('Paul Sloan', 823.5),
 ('Sarah Lacy and Paul Carr', 793.0),
 ('Steve Gillmor', 761.7666666666667),
 ('nik', 758.0),
 ('Frank Gruber', 711.875),
 ('Marc Benioff', 711.5),
 ('Gagan Biyani', 692.8846153846154),
 ('Sarah Lacy', 670.4927536231884),
 ('Vivian Wu', 650.0),
 ('Ryan Stewart', 644.0),
 ('Scott Merrill', 622.4705882352941),
 ('Jay Donovan', 614.2),
 ('Devin Coldewey', 590.1013513513514),
 ('Ciara Byrne', 573.0),
 ('tcnikc', 568.3030303030303),
 ('Markus Goebel', 555.2857142857143),
 ('Serkan Toto', 550.3263157894737),
 ('Doug Aamoth', 533.930232558

### H-index

We compute the h-index of the authors.

**$h$-index definition:** An author has an $h$-index equal to $h$, if $h$ of his/her $N$ blog posts have at least $h$ inlinks each and the other $(N – h)$ blog posts have $\leq h$ citations each. For instance, an $h$-index of 17 means that the blogger has authored at least 17 posts that have each been cited at least 17 times. If the bloggers's 18th most cited post was cited only 17 times, then the $h$-index will remain at 17. If the bloggers's 18th most cited publication was cited 18 or more times, the $h$-index would rise to 18.



In [None]:
# First emit author and number of inlinks per post
def map_citations(row):
    author = row['author']
    try:
        inlinks = int(row['inlinks'])
    except:
        inlinks = 0  # I noticed some strings in the inlinks column  
    return author, inlinks
        
authors_inlinks_rdd = rdd.map(map_citations)

# Then combine author inlinks into a list 
def to_list(inlinks):
    return [inlinks]

def append(list_of_inlinks, inlinks):
    list_of_inlinks.append(inlinks)
    return list_of_inlinks

def extend(l1, l2):
    l1.extend(l2)
    return l1

authors_list_inlinks = authors_inlinks_rdd.combineByKey(to_list, append, extend)

# Then compute h_index for each list of inlinks
def map_h_index(pair):
    author = pair[0]
    inlinkList = pair[1]
    sortedInlinks = sorted(inlinkList, reverse=True) # a way to determine the top most cited posts
    h_index = 0
    for h, n_links in enumerate(sortedInlinks): # if the top h posts have at least h inlinks, keep h
        if n_links >= (h+1): # +1 because enumerate starts at 0
            h_index = h+1
        else:
            break
    return author, h_index
    
authors_h_index = authors_list_inlinks.map(map_h_index)

# Finally reverse the RDD to sort by value and reverse back to print
rdd_rev = authors_h_index.map(lambda x: ( x[1], x[0] )) 
rdd_rev_sort = rdd_rev.sortByKey(False)
rdd_sort = rdd_rev_sort.map(lambda x: ( x[1], x[0] )) 
rdd_sort.collect()

[('Michael Arrington', 91),
 ('Erick Schonfeld', 53),
 ('MG Siegler', 43),
 ('Jason Kincaid', 39),
 ('Duncan Riley', 36),
 ('Robin Wauters', 34),
 ('Mark Hendrickson', 31),
 ('Leena Rao', 27),
 ('Marshall Kirkpatrick', 26),
 ('Guest Author', 25),
 ('Nick Gonzalez', 23),
 ('John Biggs', 22),
 ('Greg Kumparak', 22),
 ('Sarah Lacy', 21),
 ('tcnikc', 21),
 ('Mike Butcher', 18),
 ('Don Reisinger', 16),
 ('Steve Gillmor', 15),
 ('Devin Coldewey', 14),
 ('Serkan Toto', 14),
 ('Ouriel Ohayon', 13),
 ('Paul Carr', 12),
 ('Brian Solis', 12),
 ('Matt Burns', 12),
 ('Peter Ha', 12),
 ('Roi Carthy', 11),
 ('Natali Del Conte', 11),
 ('Vivek Wadhwa', 11),
 ('Doug Aamoth', 9),
 ('Henry Work', 9),
 ('Blake Robinson', 9),
 ('Nicholas Deleon', 9),
 ('Calley Nye', 8),
 ('Gagan Biyani', 7),
 ('Scott Merrill', 6),
 ('Frank Gruber', 6),
 ('Jeff Widman', 6),
 ('Dan Kimerling', 5),
 ('Andrew Meyer', 5),
 ('tccameronc', 5),
 ('Neil Kjeldsen', 5),
 ("Steve O'Hear", 4),
 ('alain', 4),
 ('Jack McKenna', 4),
 ('Dav

## Part 2: Car prices

We are using the `car_prices.csv` input file. It is a Comma Separated Values (CSV) file that provides information pertaining to the sales transactions of various vehicles. The dataset comprises 558837 rows and 16 columns and occupies around 85 MB.

Each row denotes a car sale. The columns represent:

1. `year`: The manufacturing year of the vehicle.
2. `make`: The brand or manufacturer of the vehicle.
3. `model`: The specific model of the vehicle.
4. `trim`: Additional designation for the vehicle model.
5. `body`: The body type of the vehicle (e.g., SUV, Sedan).
6. `transmission`: The type of transmission in the vehicle (e.g., automatic).
7. `vin`: Vehicle Identification Number, a unique code for each vehicle.
8. `state`: The state where the vehicle is registered.
9. `condition`: Condition of the vehicle, possibly rated on a scale.
10. `odometer`: The mileage or distance traveled by the vehicle.
11. `color`: Exterior color of the vehicle.
12. `interior`: Interior color of the vehicle.
13. `seller`: The entity selling the vehicle.
14. `mmr`: Manheim Market Report, possibly indicating the estimated market value of the vehicle.
15. `sellingprice`: The price at which the vehicle was sold.
16. `saledate`: The date and time when the vehicle was sold.

Dataset source: [https://www.kaggle.com/datasets/syedanwarafridi/vehicle-sales-data](https://www.kaggle.com/datasets/syedanwarafridi/vehicle-sales-data)

Similarly to almost all real-world datasets, this one has several data quality issues. One of these issues has to do with missing values. Therefore, there might be missing dates, brands, prices, colors, etc. Imputing missing values is out of the scope of this project. Consequently, in **all**  implementations,  **we simply ignore all the rows that have missing values on columns 1, 2, 10, 11, 12, 15, and 16.** Fortunately, only a small portion of the records is going to be lost with this strategy.


In [None]:
# We read the input file here

df = sc.read.option('delimiter', ',').option('header', 'false')\
    .csv("file://///home/hadoop/Documents/car_prices.csv")

df = df.toDF(*[str(i) for i in range(1,17)]) # much cleaner

df = df.dropna(subset = ['1','2','10','11','12','15','16'])

# Always a good idea to take a peek
df.printSchema()
df.show(5)

### Compute Yearly Statistics

Here we are interested in computing yearly statistics about car sales. In particular, we compute the:

* number of vehicles per year,
* total value of the sold cars per year,
* average distance travelled by the sold cars per year and
* average age of the sold cars per year.


In [None]:
# To be honest it feels cleaner to use dataframes and pure SQL for this one. 

from pyspark.sql.functions import col, substring

# Extract sale year, drop empty, cast to int
df = df.withColumn("year_sold", substring(col('16'), 12, 4))
df = df.filter(col('year_sold') != "")
df = df.withColumn("year_sold", col('year_sold').cast('int'))

# Compute age
df = df.withColumn('year_made', col('1').cast('int'))
df = df.withColumn('age', col('year_sold') - col('year_made'))

# Cast price and distance
df = df.withColumn('price', col('15').cast('float'))
df = df.withColumn('distance', col('10').cast('float'))

# Pure SQL
df.createOrReplaceTempView("cars")
df_sql_1 = sc.sql("""
    select year_sold, count(*) as vehicles_sold, sum(price) as total_value, avg(distance) as average_distance, avg(age) as average_age
    from cars 
    group by year_sold
    """)
df_sql_1.show()

[Stage 34:>                                                         (0 + 4) / 4]

+---------+------+-------------+-----------------+-----------------+
|year_sold| sales|   sum(price)|    avg(distance)|         avg(age)|
+---------+------+-------------+-----------------+-----------------+
|     2015|494246|6.891529523E9|65949.50182702541|4.729416120717214|
|     2014| 53435| 6.05225395E8|82384.64897539066|5.330158136053148|
+---------+------+-------------+-----------------+-----------------+



                                                                                

### Compute Yearly Statistics per Brand

Here we are interested in the performance of the car sales **per brand, in a yearly fashion**. In fact, we will compute the same statistics as those above, but also groupped by brand. More specifically, we are interested in computing the:

* number of vehicles per year, per brand,
* total value of the sold cars per year, per brand,
* average distance travelled by the sold cars per year, per brand and
* average age of the sold cars per year per brand.



In [None]:
df_sql_2 = sc.sql("""
    select year_sold, `2` as brand, count(*) as vehicles_sold, sum(price) as total_value, avg(distance) as average_distance, avg(age) as average_age
    from cars 
    group by year_sold, brand 
    order by brand, year_sold
    """)

df_sql_2.show(df_sql_2.count())

+---------+-------------+-----+-------------+------------------+------------------+
|year_sold|        brand|sales|   sum(price)|     avg(distance)|          avg(age)|
+---------+-------------+-----+-------------+------------------+------------------+
|     2014|        Acura|  708|    9340800.0| 89072.75847457627|6.0790960451977405|
|     2015|        Acura| 5190|    7.33675E7| 85369.27919075145| 6.532562620423892|
|     2015| Aston Martin|   25|    1370300.0|          26603.64|              7.44|
|     2014|         Audi|  573|    9480850.0| 77641.15706806282| 5.900523560209424|
|     2015|         Audi| 5292| 1.07327625E8| 64790.82879818594| 5.441421012849585|
|     2014|          BMW| 2067|  3.9252415E7| 72388.04160619255| 5.523463957426221|
|     2015|          BMW|18558| 4.00744073E8| 63604.38657182886| 5.458454574846427|
|     2014|      Bentley|   18|    1199450.0| 49674.27777777778| 6.777777777777778|
|     2015|      Bentley|   98|    7427200.0|37323.142857142855|6.8979591836

### Large-scale analytics: Feature Exploration

Here we will perform a part of what is called feature exploration. Feature exploration focuses on quantifying the impact of a particular feature on the target variable. While such analyses typically include all features, in this example we are only interested in finding out how the (exterior) color of a car affects its sales. More specifically, we compute:

* number of vehicles per (exterior) color, and
* the total value of the sold cars per color.

We sort the output in reverse number of sales.



In [None]:
df_sql_3 = sc.sql("""
    select `11` as exterior_color, count(*) as vehicles_sold, sum(price) as total_value
    from cars 
    where `11` not like '%—%' 
    group by exterior_color 
    order by vehicles_sold desc
    """)

df_sql_3.show()

+---------+------+-------------+
|ext_color| sales|   sum(price)|
+---------+------+-------------+
|    black|109135|1.698768823E9|
|    white|104276|1.550578349E9|
|   silver| 81857| 9.70710912E8|
|     gray| 81834|1.144426034E9|
|     blue| 50051|   5.771055E8|
|      red| 42754| 5.46834181E8|
|     gold| 10989|  9.2645042E7|
|    green| 10944|  9.2929937E7|
|    beige|  8980|  8.5443618E7|
| burgundy|  8796| 1.05848944E8|
|    brown|  6635| 1.02576862E8|
|   orange|  2006|  2.6092013E7|
|   purple|  1526|  1.8569503E7|
|off-white|  1421|  2.1759603E7|
|   yellow|  1235|   1.557388E7|
| charcoal|   473|    7697651.0|
|turquoise|   225|    1943402.0|
|     pink|    42|     493350.0|
|     lime|    15|     198850.0|
+---------+------+-------------+



### Exploratory Analysis

Finally we will perform a part of what is called exploratory analysis. This process applies unsupervised techniques to a data collection with the aim of discovering potentially useful information. In this example we are interested in finding out the combination of exterior and interior colors that sells most. More specifically, we compute:

* the number of vehicles per pair of exterior/interior color.

We sort the output in reverse number of sales.


In [None]:
df_sql_4 = sc.sql("""
    select `11` as exterior_color, `12` as interior_color, count(*) as vehicles_sold 
    from cars 
    where `11` not like '%—%' and `12` not like '%—%' 
    group by exterior_color, interior_color 
    order by vehicles_sold desc
    """)

df_sql_4.show(df_sql_4.count())

+---------+---------+-----+
|ext_color|int_color|sales|
+---------+---------+-----+
|    black|    black|60646|
|     gray|    black|41058|
|   silver|    black|39645|
|    white|    black|36526|
|    white|     gray|34937|
|   silver|     gray|34173|
|     gray|     gray|29716|
|    black|     gray|25625|
|     blue|     gray|19582|
|      red|    black|18522|
|     blue|    black|18268|
|    white|    beige|15195|
|      red|     gray|12477|
|    white|      tan|10942|
|    black|    beige| 9610|
|    black|      tan| 7066|
|      red|    beige| 5606|
|     blue|    beige| 5343|
|     gray|    beige| 4680|
|      red|      tan| 4008|
|     blue|      tan| 3830|
|     gold|    beige| 3798|
|    green|     gray| 3604|
|     gold|      tan| 2998|
| burgundy|    black| 2794|
|     gray|      tan| 2710|
|   silver|    beige| 2685|
|    beige|      tan| 2546|
|    beige|    beige| 2518|
|    green|    black| 2377|
| burgundy|     gray| 2326|
|    green|    beige| 2238|
|    white|    brown