In [7]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

## Part 1: Merging Our Two Datasets Together

### This part is the one part done in Pandas just because of the simple API available -- all other work data preprocessing is done in PySpark

In [8]:
prices = pd.read_csv('prices.csv')
fundamentals = pd.read_csv('fundamentals.csv')

In [9]:
prices['date']= pd.to_datetime(prices["date"])

In [10]:
prices['quarter'] = prices['date'].dt.quarter
prices['year']=prices['date'].dt.year

In [11]:
fundamentals.drop(columns=[fundamentals.columns[0]],inplace=True)
fundamentals.columns=fundamentals.columns.map(lambda x: "_".join(x.lower().replace(".","_").split(" ")))
fundamentals = fundamentals.rename(columns={'ticker_symbol':'symbol'})

In [12]:
fundamentals['date']= pd.to_datetime(fundamentals["period_ending"])

In [13]:
fundamentals['quarter'] = fundamentals['date'].dt.quarter
fundamentals['year']=fundamentals['date'].dt.year
fundamentals.drop(columns=['date'],inplace=True)

In [14]:
combined = prices.merge(fundamentals,on=['symbol','year','quarter'])

In [None]:
combined.to_csv('stocks.csv',index=False)

### Number of Records

In [None]:
len(combined)

### Number of Columns

In [None]:
len(combined.columns)

### Statistical Summary of Response Variable

In [None]:
response = combined['close']
response.describe()

### Statistical Summary of (Top 10) Predictor Variables

In [None]:
selectedCols = ['date', 'symbol', 'open', 'volume',
       'year','cash_ratio','accounts_payable','gross_profit','net_income','long-term_debt']

In [None]:
parsed =combined[selectedCols]

#### Numerical Columns

In [None]:
parsed.describe()

#### Categorical Column

In [None]:
combined['symbol'].value_counts()

### Helpful Graphs

#### Long Term Debt vs. Net Income

In [None]:
plt.scatter(parsed['long-term_debt'],parsed['net_income'])
plt.show()

#### Correlation Matrix of the Top 10 Predictors

In [None]:
plt.matshow(parsed.corr())
plt.show()

#### Histogram of Gross Profit

In [None]:
plt.hist(parsed['gross_profit'],bins=20)

### Average Net Income By Year Quarter

In [None]:
year_breakdown = combined.groupby(['year','quarter'])['net_income'].mean().reset_index()

In [None]:
x = year_breakdown['year'].astype(str)+ "Q" + year_breakdown['quarter'].astype(str)

In [None]:
plt.figure(figsize=(20,10))
plt.scatter(x,year_breakdown['net_income'])
plt.show()

## Part 2: Data Preprocessing in PySpark

In [1]:
from pyspark.mllib.stat import Statistics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col 
from pyspark.ml.feature import StandardScaler,Bucketizer
from pyspark.sql.types import FloatType
from pyspark.sql.functions import isnan, when, count, col
# import data types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.getOrCreate()

In [17]:
# read text file into pyspark dataframe
# schema = StructType([StructField(col, FloatType(), True) for col in combined.columns])

filename = 'stocks.csv'
df = spark.read.csv(filename, inferSchema=True, header = True)
df.cache()

DataFrame[date: timestamp, symbol: string, open: double, close: double, low: double, high: double, volume: double, quarter: int, year: int, period_ending: timestamp, accounts_payable: double, accounts_receivable: double, add'l_income/expense_items: double, after_tax_roe: double, capital_expenditures: double, capital_surplus: double, cash_ratio: double, cash_and_cash_equivalents: double, changes_in_inventories: double, common_stocks: double, cost_of_revenue: double, current_ratio: double, deferred_asset_charges: double, deferred_liability_charges: double, depreciation: double, earnings_before_interest_and_tax: double, earnings_before_tax: double, effect_of_exchange_rate: double, equity_earnings/loss_unconsolidated_subsidiary: double, fixed_assets: double, goodwill: double, gross_margin: double, gross_profit: double, income_tax: double, intangible_assets: double, interest_expense: double, inventory: double, investments: double, liabilities: double, long-term_debt: double, long-term_inves

In [19]:
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- symbol: string (nullable = true)
 |-- open: double (nullable = true)
 |-- close: double (nullable = true)
 |-- low: double (nullable = true)
 |-- high: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- period_ending: timestamp (nullable = true)
 |-- accounts_payable: double (nullable = true)
 |-- accounts_receivable: double (nullable = true)
 |-- add'l_income/expense_items: double (nullable = true)
 |-- after_tax_roe: double (nullable = true)
 |-- capital_expenditures: double (nullable = true)
 |-- capital_surplus: double (nullable = true)
 |-- cash_ratio: double (nullable = true)
 |-- cash_and_cash_equivalents: double (nullable = true)
 |-- changes_in_inventories: double (nullable = true)
 |-- common_stocks: double (nullable = true)
 |-- cost_of_revenue: double (nullable = true)
 |-- current_ratio: double (nullable = true)
 |-- deferred_asset_

#### Filling NA's

In [20]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
missingCols = ['cash_ratio','current_ratio','quick_ratio','for_year','earnings_per_share']
df_stats = df.select(
    _mean(col('cash_ratio')).alias('mean_cash_ratio'),
    _mean(col('current_ratio')).alias('mean_current_ratio'),
    _mean(col('quick_ratio')).alias('mean_quick_ratio'),
    _mean(col('for_year')).alias('mean_for_year'),
    _mean(col('earnings_per_share')).alias('mean_earnings_per_share'),
).collect()
df_stats[0][1]

# mean = [stat['mean'] for stat in df_stats]

185.79550155398894

In [21]:
meanFill = {colName:df_stats[0][i] for i,colName in enumerate(missingCols)}
meanFill
df = df.fillna(meanFill)

#### Convert String Ticker to Index

In [24]:
from pyspark.ml.feature import StringIndexer

In [25]:
indexer = StringIndexer(inputCol="symbol", outputCol="symbolIndex")
df = indexer.fit(df).transform(df)

#### OneHot Encoding

In [35]:
from pyspark.ml.feature import OneHotEncoder

In [None]:
encoder = OneHotEncoder(inputCol='symbolIndex',outputCol='symbolVec')
df = encoder.transform(df)
df.select('symbolVec').take(2)

In [45]:
df.cache()
df.take(1)

Exception ignored in: <function JavaWrapper.__del__ at 0x7f7b1c786d40>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'OneHotEncoder' object has no attribute '_java_obj'


[Row(date=datetime.datetime(2012, 1, 2, 19, 0), symbol='BBY', open=23.700001, close=23.68, low=23.540001, high=23.879998999999998, volume=5431600.0, quarter=1, year=2012, period_ending=datetime.datetime(2012, 3, 2, 19, 0), accounts_payable=7876000000.0, accounts_receivable=41000000.0, add'l_income/expense_items=77000000.0, after_tax_roe=33.0, capital_expenditures=-766000000.0, capital_surplus=0.0, cash_ratio=14.0, cash_and_cash_equivalents=1199000000.0, changes_in_inventories=120000000.0, common_stocks=34000000.0, cost_of_revenue=34473000000.0, current_ratio=116.0, deferred_asset_charges=0.0, deferred_liability_charges=0.0, depreciation=945000000.0, earnings_before_interest_and_tax=2277000000.0, earnings_before_tax=2166000000.0, effect_of_exchange_rate=5000000.0, equity_earnings/loss_unconsolidated_subsidiary=0.0, fixed_assets=3471000000.0, goodwill=1335000000.0, gross_margin=24.0, gross_profit=10984000000.0, income_tax=742000000.0, intangible_assets=359000000.0, interest_expense=11100