In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import to_timestamp

# from pyspark.sql.functions import col, udf
from pyspark.sql.functions import col, unix_timestamp, to_date


In [3]:
spark = pyspark.sql.SparkSession.builder.master("local").getOrCreate()
spark

21/12/07 12:10:16 WARN Utils: Your hostname, LAPTOP-T2T1G8JL resolves to a loopback address: 127.0.1.1; using 172.22.10.85 instead (on interface eth0)
21/12/07 12:10:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/07 12:10:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/07 12:10:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
# Let's define our schema
schema = StructType([\
    StructField("timestamp", StringType(), True),\
    StructField("company", StringType(), True),\
    StructField("level", StringType(), True),\
    StructField("title", StringType(), True),\
    StructField("totalyearlycompensation", IntegerType(), False),\
    StructField("location", StringType(), True),\
    StructField("yearsofexperience", FloatType(), False),\
    StructField("yearsatcompany", FloatType(), False),\
    StructField("tag", StringType(), True),\
    StructField("basesalary", IntegerType(), False),\
    StructField("stockgrantvalue", IntegerType(), False),\
    StructField("bonus", IntegerType(), False),\
    StructField("gender", StringType(), True),\
    StructField("cityid", StringType(), True),\
    StructField("dmaid", StringType(), True),\
    StructField("race", StringType(), True),\
    StructField("education", StringType(), True)])

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("csv")\
    .option("header", "false")\
    .option("delimiter", "\t")\
    .schema(schema)\
    .load("data/cleaned.txt")
data.show(n=5)

+------------------+---------+-----+--------------------+-----------------------+-----------------+-----------------+--------------+---+----------+---------------+-----+------+------+-----+----+---------+
|         timestamp|  company|level|               title|totalyearlycompensation|         location|yearsofexperience|yearsatcompany|tag|basesalary|stockgrantvalue|bonus|gender|cityid|dmaid|race|education|
+------------------+---------+-----+--------------------+-----------------------+-----------------+-----------------+--------------+---+----------+---------------+-----+------+------+-----+----+---------+
|  06/07/2017 11:33|   ORACLE|   L3|     Product Manager|                 127000| Redwood City, CA|              1.5|           1.5| NA|    107000|          20000|10000|    NA|  7392|  807|  NA|       NA|
|  06/10/2017 17:11|     EBAY| SE 2|   Software Engineer|                 100000|San Francisco, CA|              5.0|           3.0| NA|    141907|              0|    0|    NA|  74

In [None]:
data.columns

## Correlation

In [None]:
# From stackoverflow
data.stat.corr("totalyearlycompensation","basesalary")

In [None]:
# From stackoverflow
data.stat.corr("totalyearlycompensation","basesalary", "pearson")

In [None]:
# From stackoverflow
data.stat.corr("totalyearlycompensation","basesalary", "spearman")

## Dataframe Generic Guide

In [None]:
data.describe("basesalary")

In [None]:
summary = data.describe("basesalary")
summary.show()

## Average Base Salary from 2017 to 2021

In [None]:
## Converting it to numpy dataframe

basesalary_dates2 = data.select('timestamp', 'basesalary').toPandas()
basesalary_dates2["timestamp"] = pd.to_datetime(basesalary_dates2["timestamp"])
basesalary_dates2['YearMonth'] = basesalary_dates2['timestamp'].apply(lambda x: '{year}-{month}'.format(year=x.year, month=x.month))
basesalary_dates2 = basesalary_dates2.groupby('YearMonth', as_index=False).agg({'basesalary' : ['mean']})
basesalary_dates2.columns = ['Date', 'Avg_Salary']
basesalary_dates2['Date'] = pd.to_datetime(basesalary_dates2['Date'],format='%Y-%m')

In [None]:
fig, ax = plt.subplots(figsize=(20, 7))
basesalary_dates.plot(x="Date", ax=ax, legend=None)
plt.title("Average Base Salary", fontsize=26)
plt.xlabel("Year/Month")
plt.ylabel("Salary", fontsize=20)

In [None]:
## Leaving it as spark default table

basesalary_dates = data.select('timestamp', 'basesalary')

In [None]:
basesalary_dates['timestamp'] = basesalary_dates.select(to_timestamp(basesalary_dates.timestamp, 'dd/MM/YYY HH:mm').alias('timestamp'))

In [None]:
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

In [None]:
basesalary_dates1 = basesalary_dates.withColumn(to_timestamp(col('timestamp'), 'dd/MM/YYY HH:mm'), )

In [None]:
basesalary_dates1 = basesalary_dates.withColumn("record_date",basesalary_dates['timestamp'].cast(TimestampType()))
#below is the result

In [None]:
basesalary_dates1.show(3)

In [None]:
## Followed this link https://stackoverflow.com/questions/47953320/change-column-type-from-string-to-date-in-pyspark/47953572

basesalary_dates1 = basesalary_dates.withColumn('date_in_dateFormat', 
                   to_date(unix_timestamp(col('timestamp'), 'MM/dd/yyyy HH:mm:ss').cast("timestamp"))).select('date_in_dateFormat', 'basesalary')

In [None]:
basesalary_dates1.show(2)

In [None]:
## Cant get these working from this link https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/

basesalary_dates1.groupBy("date_in_dateFormat").mean("basesalary").show(10, truncate=False)

In [None]:
basesalary_dates1.groupBy("date_in_dateFormat").min("basesalary").show()

In [14]:
data['timestamp'].show(5, truncate=False)

TypeError: 'Column' object is not callable