# Data-Analysis-and-Wrangling-with-Spark-on-AWS-EMR

## Step 0: Set up EMR

### 0.1: The Superfluous Setup

In [None]:
#%%capture
!apt install libkrb5-dev
!pip install sparkmagic
!pip install penngrader-client

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
libkrb5-dev is already the newest version (1.19.2-2ubuntu0.3).
libkrb5-dev set to manually installed.
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.
Collecting sparkmagic
  Downloading sparkmagic-0.21.0.tar.gz (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.3/45.3 kB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting hdijupyterutils>=0.6 (from sparkmagic)
  Downloading hdijupyterutils-0.21.0.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting autovizwidget>=0.6 (from sparkmagic)
  Downloading autovizwidget-0.21.0.tar.gz (9.0 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting requests_kerberos>=0.8.0 (from sparkmagic)
  Downloading requests_kerberos-0.14.0-py2.py3-none-any.whl (11 kB)
Collecting jupyter>=1 (from hdijupyterutils>=0.6->sparkmagi

In [None]:
%load_ext sparkmagic.magics

### 0.2: The Sharp Spark

In [None]:
!pip install lxml



In [None]:
%spark add -s spark_session -l python -u http://ec2-111111.compute-1.amazonaws.com -a 111111 -p 1111111 -t Basic_Access

An error was encountered:
Error sending http request and maximum retry encountered.


## Step 1: Data Wrangling, Cleaning, and Shaping

### 1.1: The Stupendous Schema



In [None]:
%%spark

from pyspark.sql.types import *

# Finish defining the linkedin_small_real.json schema

schema = StructType([
    StructField("_id", StringType(), nullable=True),

    StructField("education", ArrayType(
        StructType([
          StructField("start", StringType(), nullable=True),
          StructField("major", StringType(), nullable=True),
          StructField("end", StringType(), nullable=True),
          StructField("name", StringType(), nullable=True),
          StructField("degree", StringType(), nullable=True),
          StructField("desc", StringType(), nullable=True)
    ])), nullable=True),

    StructField("group", StructType([
          StructField("affilition", ArrayType(StringType()), nullable=True),
          StructField("member", StringType(), nullable=True)
    ]), nullable=True),

    StructField("locality", StringType(), nullable=True),
    StructField("skills", ArrayType(StringType()), nullable=True),
    StructField("industry", StringType(), nullable=True),
    StructField("interval", IntegerType(), nullable=True),

    StructField("summary", StringType(), nullable=True),
    StructField("interests", StringType(), nullable=True),
    StructField("overview_html", StringType(), nullable=True),
    StructField("specilities", StringType(), nullable=True),
    StructField("homepage", ArrayType(StringType()), nullable=True),
    StructField("honors", ArrayType(StringType()), nullable=True),
    StructField("url", StringType(), nullable=True),
    StructField("also_view", ArrayType(
      StructType([
          StructField("id", StringType(), nullable=True),
          StructField("url", StringType(), nullable=True)
      ])
    ), nullable=True),


    StructField("name", StructType([
        StructField("family_name", StringType(), nullable=True),
        StructField("given_name", StringType(), nullable=True)
    ]), nullable=True),

    StructField("experience", ArrayType(
        StructType([
            StructField("org", StringType(), nullable=True),
            StructField("title", StringType(), nullable=True),
            StructField("end", StringType(), nullable=True),
            StructField("start", StringType(), nullable=True),
            StructField("desc", StringType(), nullable=True)
        ])
    ), nullable=True),

    StructField("events", ArrayType(
        StructType([
            StructField("from", StringType(), nullable=True),
            StructField("to", StringType(), nullable=True),
            StructField("title1", StringType(), nullable=True),
            StructField("start", IntegerType(), nullable=True),
            StructField("title2", StringType(), nullable=True),
            StructField("end", IntegerType(), nullable=True)
        ])
    ), nullable=True)


])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.2: The Langorous Load

#### 1.2.1: Load LinkedIn Dataset

In [None]:
%%spark

linkedin_data_sdf = spark.read.json("s3a://linkedin_small_real.json", schema=schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
# Let's print out the first few rows to see how the data looks like in tabular form
linkedin_data_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             _id|           education|               group|            locality|              skills|            industry|interval|             summary|           interests|       overview_html|         specilities|homepage|honors|                 url|           also_view|                name|          experience|              events|
+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--------

In [None]:
import pandas as pd

In [None]:
%%spark

linkedin_data_sdf.createOrReplaceTempView("linkedin_data")

query = '''SELECT *
           FROM linkedin_data
           ORDER BY _id
           LIMIT 10'''

answer_sdf = spark.sql(query)
answer_sdf.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               _id|           education|               group|            locality|              skills|            industry|interval|             summary|           interests|       overview_html|         specilities|homepage|honors|                 url|           also_view|                name|          experience|              events|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------+------+--------------------+--------------------+--------------------+--------------------+--

In [None]:
#Convert to Pandas
%spark -o answer_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.2.2: SQL refresher [3 Pts]


In [None]:
%%spark

industry_family_name_df = spark.sql("""
    SELECT
        _id,
        industry,
        name.family_name AS family_name
    FROM
        linkedin_data
    WHERE
        industry IS NOT NULL AND
        name.family_name IS NOT NULL
    ORDER BY
        _id ASC,
        industry ASC,
        family_name ASC
    LIMIT 100
""")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#Convert to Pandas
%spark -o industry_family_name_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.2.3: Load Stock Prices Data

In [None]:
%%spark

from pyspark.sql.types import *

stocks_schema = StructType([
    StructField("Date", StringType(), nullable=True),
    StructField("Open", FloatType(), nullable=True),
    StructField("High", FloatType(), nullable=True),
    StructField("Low", FloatType(), nullable=True),
    StructField("Close", FloatType(), nullable=True),
    StructField("Volume", IntegerType(), nullable=True),
    StructField("OpenInt", IntegerType(), nullable=True),
    StructField("org", StringType(), nullable=True)
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

load the entire `stocks.csv` dataset from S3 bucket into a Spark dataframe  called `stocks_sdf`.

In [None]:

%%spark

# Load stock data
stocks_sdf = spark.read.format("csv") \
              .option("header", "true") \
              .schema(stocks_schema) \
              .load("s3a://stocks.csv")

stocks_sdf.createOrReplaceTempView('stocks')

query = '''SELECT *
           FROM stocks'''
answer_stocks_sdf = spark.sql(query)
answer_stocks_sdf.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+------+------+------+--------+-------+---+
|      Date|  Open|  High|   Low| Close|  Volume|OpenInt|org|
+----------+------+------+------+------+--------+-------+---+
|1999-11-18|30.713|33.754|27.002|29.702|66277506|      0|  A|
|1999-11-19|28.986|29.027|26.872|27.257|16142920|      0|  A|
|1999-11-22|27.886|29.702|27.044|29.702| 6970266|      0|  A|
|1999-11-23|28.688|29.446|27.002|27.002| 6332082|      0|  A|
|1999-11-24|27.083|28.309|27.002|27.717| 5132147|      0|  A|
|1999-11-26|27.594|28.012|27.509|27.807| 1832635|      0|  A|
|1999-11-29|27.676| 28.65| 27.38|28.432| 4317826|      0|  A|
|1999-11-30| 28.35|28.986|27.634| 28.48| 4567146|      0|  A|
|1999-12-01| 28.48|29.324|28.273|28.986| 3133746|      0|  A|
|1999-12-02|29.532|30.375|29.155|29.786| 3252997|      0|  A|
+----------+------+------+------+------+--------+-------+---+
only showing top 10 rows

In [None]:
#Convert to Pandas
%spark -o answer_stocks_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.2.4: Calculate Percentage Change

In [None]:
%%spark
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

answer_sdf = stocks_sdf.withColumn(
    "percentage_change",
    F.when(F.col("Open") != 0.0,
           (F.col("Close") - F.col("Open")) / F.col("Open") * 100.0)
).orderBy("Date", "org")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
answer_sdf.createOrReplaceTempView("test_1_2_4")
test_1_2_4_sdf = spark.sql("SELECT * FROM test_1_2_4 LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_1_2_4_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.3: Cleaning LinkedIn Data

#### 1.3.1: Adding Experience

In [None]:
%%spark

from pyspark.sql.functions import explode, col

raw_start_dates_sdf = linkedin_data_sdf.select(
    explode("experience").alias("experience")
).select(
    col("experience.org").alias("org"),
    col("experience.start").alias("start_date")
).dropna(subset=["org", "start_date"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
raw_start_dates_sdf.createOrReplaceTempView("test_1_3_1")
test_1_3_1_sdf = spark.sql("SELECT * FROM test_1_3_1 ORDER BY org ASC, start_date DESC LIMIT 20")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_1_3_1_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.3.2: Filtering on Date [5 Pts]

In [None]:
%%spark

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

from pyspark.sql.functions import regexp_extract, to_date, col, lit

filtered_start_dates_sdf = raw_start_dates_sdf.withColumn(
    "start_date_formatted",
    to_date(
        regexp_extract("start_date", r"^(January|February|March|April|May|June|July|August|September|October|November|December) (\d{4})$", 0),
        "MMMM yyyy"
    )
).filter(
    col("start_date_formatted").isNotNull() & (col("start_date_formatted") >= to_date(lit("2000-01-01")) ) & (col("start_date_formatted") <= to_date(lit("2011-12-01")))
).drop("start_date") \
 .withColumnRenamed("start_date_formatted", "start_date") \
 .dropna(subset=["org", "start_date"])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
filtered_start_dates_sdf.createOrReplaceTempView("test_1_3_2")
test_1_3_2_sdf = spark.sql("SELECT * FROM ((SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') AS start_date FROM test_1_3_2 ORDER BY start_date DESC, org DESC LIMIT 10) UNION (SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') AS start_date FROM test_1_3_2 ORDER BY start_date ASC, org ASC LIMIT 10)) ORDER BY start_date ASC, org ASC")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_1_3_2_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.4: Cleaning Stock Data

#### 1.4.1: Adding Company Names

In [None]:
%%spark

# Dictionary linking stock ticker symbols to their names
ticker_to_name_dict = {'NOK': 'Nokia',
                       'UN': 'Unilever',
                       'BP': 'BP',
                       'JNJ': 'Johnson & Johnson',
                       'ORCL': 'Oracle',
                       'BAC': 'Bank of America',
                       'PG': 'Procter & Gamble',
                       'CGEMY': 'Capgemini',
                       'GS': 'Goldman Sachs',
                       'C': 'Citi',
                       'IBM': 'IBM',
                       'CS': 'Credit Suisse',
                       'DB': 'Deutsche Bank',
                       'MSFT': 'Microsoft',
                       'HPE': 'Hewlett-Packard',
                       'ERIC': 'Ericsson',
                       'BCS': 'Barclays Capital',
                       'GSK': 'GlaxoSmithKline'}


def ticker_to_name(ticker):
    if ticker is None:
        return None
    return ticker_to_name_dict.get(ticker, None)

spark.udf.register("TICKER_TO_NAME", ticker_to_name, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function ticker_to_name at 0x7fde222f27a0>

In [None]:
%%spark

ticker_to_name = [((str(ticker_to_name("GOOGL")),str(ticker_to_name("TSLA"))))]
columns = ['A', 'B']
dataframe = spark.createDataFrame(ticker_to_name, columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o dataframe

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.4.2: Wrangling stocks data [5 Pts]

In [None]:
%%spark

from pyspark.sql.functions import col, to_date, expr, lit

filter_1_stocks_sdf = stocks_sdf.withColumn("org_name", expr("TICKER_TO_NAME(org)")) \
    .filter(col("org_name").isNotNull()) \
    .withColumn("date", to_date(col("Date"), "yyyy-MM-dd")) \
    .filter((col("date") >= lit("2001-01-01")) & (col("date") <= lit("2012-12-04"))) \
    .select(col("org_name").alias("org"), "date", "Close")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
filter_1_stocks_sdf.createOrReplaceTempView("test_1_4_2")
test_1_4_2_sdf = spark.sql("SELECT * FROM ((SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd') as date, Close FROM test_1_4_2 ORDER BY date DESC, org DESC LIMIT 10) UNION (SELECT org, DATE_FORMAT(date, 'yyyy-MM-dd') as date, Close FROM test_1_4_2 ORDER BY date ASC, org ASC LIMIT 10)) ORDER BY date ASC, org DESC")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_1_4_2_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 2: Analysis on LinkedIn Data

### 2.1: Counting Employees

In [None]:
%%spark

from pyspark.sql import functions as F

start_dates_sdf = filtered_start_dates_sdf.groupBy("org", "start_date") \
                                          .agg(F.count("*").alias("num_employees"))



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

start_dates_sdf.createOrReplaceTempView("test_2_1")
test_2_1_sdf = spark.sql("SELECT org, DATE_FORMAT(start_date, 'yyyy-MM-dd') as start_date, num_employees FROM test_2_1 ORDER BY num_employees DESC, org DESC, start_date ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_2_1_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.2: Reshape DataFrame

In [None]:
%%spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, col, sum as Fsum
from functools import reduce

# Assuming start_dates_sdf is already defined and available
start_dates_extended_sdf = start_dates_sdf.withColumn("year", year(col("start_date"))) \
                                          .withColumn("month", month(col("start_date")))

# Group by `org` and `year`, then pivot based on `month`
pivot_sdf = start_dates_extended_sdf.groupBy("org", "year") \
                                    .pivot("month", [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]) \
                                    .agg(Fsum("num_employees").alias("num_employees")) \
                                    .fillna(0)

# Renaming the columns to match month abbreviations and appending '_hired'
month_names = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]
for i, month in enumerate(month_names, start=1):
    pivot_sdf = pivot_sdf.withColumnRenamed(str(i), f"{month}_hired")

# Calculating the total number of hires for the year
total_num_expr = reduce(lambda a, b: a + b, [col(f"{month}_hired") for month in month_names])
raw_hire_train_sdf = pivot_sdf.withColumn("total_num", total_num_expr)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
raw_hire_train_sdf.createOrReplaceTempView("test_2_2")
test_2_2_sdf = spark.sql("SELECT * FROM test_2_2 ORDER BY total_num DESC, org DESC, year ASC LIMIT 20")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_2_2_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.3: Filtering on Company Size

In [None]:
%%spark
hire_train_sdf = raw_hire_train_sdf.filter(col("total_num") >= 20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
hire_train_sdf.createOrReplaceTempView("test_2_3")
test_2_3_sdf = spark.sql("SELECT * FROM test_2_3 ORDER BY org ASC, year ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_2_3_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 3: Analyzing Stock Data

### 3.1: Average Closing Price

In [None]:
%%spark

from pyspark.sql.functions import year, month, avg, round, format_string

filter_2_stocks_sdf = filter_1_stocks_sdf.withColumn("year", year("date")) \
                                          .withColumn("month", format_string("%02d", month("date"))) \
                                          .groupBy("org", "year", "month") \
                                          .agg(round(avg("close"), 3).alias("close")) \
                                          .orderBy("org", "year", "month")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
filter_2_stocks_sdf.createOrReplaceTempView("test_3_1")
test_3_1_sdf = spark.sql("SELECT * FROM test_3_1 LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_3_1_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3.2: Reshape DataFrame Again

In [None]:
%%spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, avg, round, format_string

# Pivot the DataFrame to get separate columns for each month's average stock price.
filter_3_stocks_sdf = filter_2_stocks_sdf.groupBy("org", "year") \
    .pivot("month", ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]) \
    .agg(avg("close").alias("close"))

# Drop rows with any null values.
filter_3_stocks_sdf = filter_3_stocks_sdf.na.drop()

# Rename columns to match the desired format.
month_names = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]
for i, month in enumerate(month_names, start=1):
    filter_3_stocks_sdf = filter_3_stocks_sdf.withColumnRenamed(str(i).zfill(2), f"{month}_stock")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
filter_3_stocks_sdf.createOrReplaceTempView("test_3_2")
test_3_2_sdf = spark.sql("SELECT * FROM test_3_2 ORDER BY org, year ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_3_2_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3.3: Direction of Change

In [None]:
%%spark

from pyspark.sql.functions import col, when

stocks_train_sdf = filter_3_stocks_sdf.withColumn(
    "direction",
    when(col("dec_stock") > col("jan_stock"), 1).otherwise(-1)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
stocks_train_sdf.createOrReplaceTempView("test_3_3")
test_3_3_sdf = spark.sql("SELECT * FROM test_3_3 ORDER BY org, year ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_3_3_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 4: Combining LinkedIn and Stocks Data

### 4.1: Combination

In [None]:
%%spark

hire_train_sdf.createOrReplaceTempView("hire_train")
stocks_train_sdf.createOrReplaceTempView("stock_train")

# SQL query to join hiring data with stock information from the same year and stock direction result from the previous year.
# 'h' alias is used for hiring data and 'c' for stock information in the current year,'u' is used for stock direction from the previous year
query = ''' SELECT h.org, h.year,
        h.jan_hired, h.feb_hired, h.mar_hired, h.apr_hired, h.may_hired,
        h.jun_hired, h.jul_hired, h.aug_hired, h.sep_hired, h.oct_hired, h.nov_hired, h.dec_hired,
        c.jan_stock, c.feb_stock, c.mar_stock, c.apr_stock, c.may_stock, c.jun_stock, c.jul_stock,
        c.aug_stock, c.sep_stock, c.oct_stock, c.nov_stock, c.dec_stock,
        u.direction as stock_result
        FROM hire_train as h
        LEFT JOIN stock_train as c ON h.org = c.org AND h.year = c.year
        LEFT JOIN stock_train as u ON h.org = u.org AND h.year = u.year - 1
        WHERE u.direction IS NOT NULL
        '''

training_sdf = spark.sql(query).dropna()

training_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------+
|             org|year|jan_hired|feb_hired|mar_hired|apr_hired|may_hired|jun_hired|jul_hired|aug_hired|sep_hired|oct_hired|nov_hired|dec_hired|jan_stock|feb_stock|mar_stock|apr_stock|may_stock|jun_stock|jul_stock|aug_stock|sep_stock|oct_stock|nov_stock|dec_stock|stock_result|
+----------------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------+
|   Cisco Systems|2006|        4|        3|        1|        3|        3|        4|        2|        3|        2|        2|        3|        0|     15.4|   15.993|   17.

In [None]:
%%spark
training_sdf.createOrReplaceTempView("test_4")
test_4_sdf = spark.sql("SELECT * FROM test_4 ORDER BY org, year ASC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%spark -o test_4_sdf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Step 5: Twitter followers
Use the concepts of graphs and BFS!

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

appName = "PySpark"

spark = SparkSession.builder.appName(appName).getOrCreate()

### 5.1: “Traversing” a Graph

In [None]:
import pandas as pd
from IPython.display import Image as I

bfsgif =\
'https://upload.wikimedia.org/wikipedia/commons/5/5d/Breadth-First-S'+\
'earch-Algorithm.gif'
dfsgif=\
'https://upload.wikimedia.org/wikipedia/commons/7/7f/Depth-First-Search.gif'

#### 5.1.2 Implement One Traversal

In [None]:
%%spark
#spark.conf.set("spark.sql.execution.arrow.enabled", "true")

from pyspark.sql import SparkSession
import pandas as pd

simple = [('A', 'B'),
         ('A', 'C'),
         ('A', 'D'),
         ('C', 'F'),
         ('F', 'A'),
         ('B', 'G'),
         ('G', 'H'),
         ('D', 'E')]

simple_dict = {'from_node': ['A', 'A', 'A', 'C', 'F', 'B', 'G', 'D'],
       'to_node': ['B', 'C', 'D', 'F', 'A', 'G', 'H', 'E']}

simple_graph_df = pd.DataFrame.from_dict(simple_dict)
simple_graph_sdf = spark.createDataFrame(simple_graph_df)
simple_graph_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------+
|from_node|to_node|
+---------+-------+
|        A|      B|
|        A|      C|
|        A|      D|
|        C|      F|
|        F|      A|
|        B|      G|
|        G|      H|
|        D|      E|
+---------+-------+

In [None]:
%%spark
smallOrig = [{'node': 'A'}]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
simple_1_round_dict = {'node': ['B', 'D', 'C', 'A'],
       'distance': [1, 1, 1, 0]}
simple_1_round_bfs_df = pd.DataFrame.from_dict(simple_1_round_dict)
simple_1_round_bfs_sdf = spark.createDataFrame(simple_1_round_bfs_df)
simple_1_round_bfs_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+
|node|distance|
+----+--------+
|   B|       1|
|   D|       1|
|   C|       1|
|   A|       0|
+----+--------+

In [None]:
def spark_bfs_1_round(visited_nodes):
    visited_nodes.createOrReplaceTempView("visited_nodes_graph")

    simple_graph_sdf.createOrReplaceTempView("G")

    # Find new nodes reachable directly from visited nodes but not already visited
    query = '''SELECT DISTINCT G.to_node AS node, vng.distance + 1 AS distance
               FROM G
               INNER JOIN visited_nodes_graph vng ON G.from_node = vng.node
               LEFT ANTI JOIN visited_nodes_graph already_visit ON G.to_node = already_visit.node
    '''
    new_visited_nodes_sdf = spark.sql(query)

    new_visited_nodes_sdf.createOrReplaceTempView("new_visited_nodes")

    union_query = '''SELECT node, distance FROM visited_nodes_graph
                     UNION
                     SELECT node, distance FROM new_visited_nodes
    '''
    updated_visited_nodes = spark.sql(union_query)

    return updated_visited_nodes


In [None]:
%%spark
simple_bfs_result = spark_bfs_1_round(simple_1_round_bfs_sdf)
simple_bfs_result.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+
|node|distance|
+----+--------+
|   D|       1|
|   B|       1|
|   C|       1|
|   A|       0|
|   E|       2|
|   G|       2|
|   F|       2|
+----+--------+

We'll copy to Colab and convert to Pandas

In [None]:
%spark -o simple_bfs_result

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Convert this result to Pandas, sort your dataframe by `node` ascending, and submit it to the autograder.

In [None]:
simple_bfs_test = simple_bfs_result.sort_values(by=["node"],ascending=True)

#### 5.1.3 Full BFS Implementation

In [None]:
%%spark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def spark_bfs(G, origins, max_depth):
    schema = StructType([
                StructField("node", StringType(), nullable=True),
                StructField("distance", IntegerType(), nullable=True),
            ])

    origins_df = spark.createDataFrame([(origin['node'], 0) for origin in origins], schema)

    G.createOrReplaceTempView("simple_graph")

    visited_nodes = origins_df

    for depth in range(max_depth):

        new_visited_nodes = spark_bfs_1_round(visited_nodes)

        visited_nodes = new_visited_nodes

        visited_nodes.createOrReplaceTempView("visited_nodes")

    return visited_nodes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
simple_bfs_iterative_result = spark_bfs(simple_graph_sdf, smallOrig, 3)
simple_bfs_iterative_result.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------+
|node|distance|
+----+--------+
|   A|       0|
|   B|       1|
|   D|       1|
|   C|       1|
|   G|       2|
|   F|       2|
|   E|       2|
|   H|       3|
+----+--------+

### 5.2: Popular People

In [None]:
!wget -nc https://storage.googleapis.com/penn-cis5450/twitter_followers.csv -O twitter_followers.csv

--2024-03-12 16:04:56--  https://storage.googleapis.com/penn-cis5450/twitter_followers.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.101.207, 142.251.2.207, 2607:f8b0:4023:c0d::cf, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.101.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1066 (1.0K) [text/csv]
Saving to: ‘twitter_followers.csv’


2024-03-12 16:04:56 (13.6 MB/s) - ‘twitter_followers.csv’ saved [1066/1066]



In [None]:
twitter_sdf = spark.read.csv("/content/twitter_followers.csv", header=True)

In [None]:
twitter_sdf.show()

+---+-----------+---------+
|_c0|      User1|    User2|
+---+-----------+---------+
|  0|      James|   Ashley|
|  1|      Karen|   Thomas|
|  2|     Sandra|    Karen|
|  3|      Karen|   Joseph|
|  4|   Patricia| Margaret|
|  5|      Karen|  Charles|
|  6|    Charles| Patricia|
|  7|Christopher|   Ashley|
|  8|  Elizabeth|    Karen|
|  9|    Barbara|  Anthony|
| 10|       Mark|  Michael|
| 11|    Anthony|  Jessica|
| 12|  Elizabeth|    Karen|
| 13|    Michael|  Richard|
| 14|    Anthony|  Charles|
| 15|      Karen|   Sandra|
| 16|      Karen| Margaret|
| 17|     Robert|   Daniel|
| 18|      Karen|    David|
| 19|Christopher|Elizabeth|
+---+-----------+---------+
only showing top 20 rows



In [None]:
twitter_sdf.createOrReplaceTempView("Twitter")

query = ''' SELECT User2 AS Name,
          COUNT(User1) AS Followers
          FROM Twitter
          GROUP BY User2
          HAVING COUNT(User1) > 0
          ORDER BY Followers DESC
'''
count_sdf = spark.sql(query)
count_sdf.show()

+--------+---------+
|    Name|Followers|
+--------+---------+
|Patricia|        6|
|   Karen|        5|
|Jennifer|        4|
| Matthew|        3|
|    John|        3|
|   Nancy|        3|
|  Thomas|        3|
|Margaret|        2|
|   Susan|        2|
| Charles|        2|
|  Ashley|        2|
|  Donald|        2|
|  Sandra|        2|
|   Betty|        2|
| Anthony|        2|
|  Joseph|        2|
|  Daniel|        2|
|   James|        1|
|   Linda|        1|
| Michael|        1|
+--------+---------+
only showing top 20 rows



In [None]:
count_df = count_sdf.toPandas()

In [None]:
twitter_sdf.createOrReplaceTempView("twitter")

query = ''' SELECT User1 AS from_node, User2 AS to_node
    FROM twitter
'''

twitter_graph_sdf = spark.sql(query)

### 5.3: The Super Bacon Search

In [None]:
orig = [{'node': 'Karen'}]

In [None]:
from pyspark.sql.types import StructType, StringType, StructField, IntegerType

def spark_bfs_1_round(G, visited_nodes):
  """
  param visited_nodes: dataframe with columns node and distance
  return: dataframe of updated visited nodes, with columns node and distance
  """
  # TODO: Complete this function to implement 1 round of BFS
  visited_nodes.createOrReplaceTempView("visited_nodes_graph")
  G.createOrReplaceTempView("G")
  query = '''SELECT DISTINCT G.to_node AS node, visited.distance + 1 AS distance
          FROM G
          INNER JOIN visited_nodes_graph visited ON G.from_node = visited.node
          LEFT ANTI JOIN visited_nodes_graph already ON G.to_node = already.node
  '''
  new_visited_nodes_sdf = spark.sql(query)

  new_visited_nodes_sdf.createOrReplaceTempView("new_visited_nodes")

  new_query = '''SELECT node, distance FROM visited_nodes_graph
              UNION
              SELECT node, distance FROM new_visited_nodes

  '''
  updated_visited_nodes = spark.sql(new_query)

  return updated_visited_nodes



def spark_bfs(G, origins, max_depth):
  """
  param G: graph dataframe from 5.1.2
  param origins: list of origin nodes stored as {"node": nodeValue}
  param max_depth: integer value of max depth to run BFS to
  return: dataframe with columns node, distance of all visited nodes
  """

  # create a dataframe to store visited_nodes
  visited_df = StructType([
            StructField("node", StringType(), nullable=True),
            StructField("distance", IntegerType(), nullable=True),
        ])

  origins_df = spark.createDataFrame([(origin['node'], 0) for origin in origins], visited_df)

  G.createOrReplaceTempView("simple_graph")

  visited_nodes = origins_df

  for depth in range(max_depth):

    new_visited_nodes = spark_bfs_1_round(G,visited_nodes)

    visited_nodes = new_visited_nodes

    visited_nodes.createOrReplaceTempView("visited_nodes")

  return visited_nodes


bfs_5 = spark_bfs(twitter_graph_sdf, orig, 5)
bfs_5.show()

+--------+--------+
|    node|distance|
+--------+--------+
|   Karen|       0|
|  Thomas|       1|
|Margaret|       1|
|  Sandra|       1|
|  Joseph|       1|
| Charles|       1|
|   David|       1|
|Patricia|       2|
|    Lisa|       2|
| Barbara|       3|
| Anthony|       4|
|   James|       4|
|  Daniel|       4|
|Jennifer|       5|
|   Nancy|       5|
| Jessica|       5|
|   Sarah|       5|
|  Ashley|       5|
+--------+--------+



In [None]:
answer_df = bfs_5.toPandas()