In [None]:
from pyspark.sql import SparkSession,  Row
from matplotlib import pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import ElasticNet, ElasticNetCV
import pandas as pd
import numpy as np

pd.set_option("display.max_columns", 100)

input_path = '/home/jovyan/work/data/autot4.7.csv'

session = SparkSession \
    .builder \
    .appName("Car data") \
    .config('spark.driver.memory', '5G') \
    .config('spark.executor.memory', '5G') \
    .getOrCreate()
    
indf = session.read \
    .format("org.apache.spark.csv") \
    .option("header", "true") \
    .option("delimiter", ";") \
    .option("mode", "DROPMALFORMED") \
    .csv(input_path)

In [None]:
new_colnames = [col.replace('.','_',5) for col in indf.columns]
indf = indf.toDF(*new_colnames)

In [None]:
factorVars = [
    'ajoneuvoluokka',
    'ajoneuvonkaytto',
    'korityyppi',
    'ohjaamotyyppi',
    'kayttovoima',
    'istumapaikkojenLkm',
    'sylintereidenLkm',
    'vaihteisto',
    'alue',
    'kunta',
    'merkki',
    'malli',
    'kayttoonotto_pvm_imputoitu'
]
numericVars = [
    'omamassa',
    'iskutilavuus',
    'suurinNettoteho',
    'matkamittarilukema',
    'kayttoonottoVuosi',
    'ensirekVuosi'
]
dateVars = [
    'ensirekisterointipvm',
    'kayttoonottopvm',
    'max_date',
    'kayttoonotto'
]


In [None]:
carsDf = indf.select(
    'ajoneuvoluokka',
    'ajoneuvonkaytto',
    'korityyppi',
    'ohjaamotyyppi',
    'kayttovoima',
    'istumapaikkojenLkm',
    'sylintereidenLkm',
    'vaihteisto',
    'alue',
    'kunta',
    'merkki',
    'malli',
    'kayttoonotto_pvm_imputoitu',
    indf['omamassa'].cast("int"),
    indf['iskutilavuus'].cast("int"),
    indf['suurinNettoteho'].cast("int"),
    indf['matkamittarilukema'].cast("int"),
    indf['kayttoonottoVuosi'].cast("int"),
    indf['ensirekVuosi'].cast("int"),
    indf['ensirekisterointipvm'].cast("timestamp"),
    indf['kayttoonottopvm'].cast("timestamp"),
    indf['max_date'].cast("timestamp"),
    indf['kayttoonotto'].cast("timestamp")
)

In [None]:
carsDf.printSchema()

In [None]:
carsDf = carsDf.filter(carsDf.matkamittarilukema.between(1, 1e6)).cache()

In [None]:
carsDf = carsDf.withColumn(
    'usage_days', 
    (carsDf['max_date'].cast('long')-carsDf['kayttoonottopvm'].cast('long'))/(24.0 * 3600.0)
)
numericVars.append('usage_days')

In [None]:
splits = carsDf.randomSplit([0.2, 0.2, 0.6], 220274)

In [None]:
modeldf = splits[0].toPandas()

In [None]:
modeldf.matkamittarilukema.hist(bins=100)

In [None]:
#for c in factorVars:
#    print(modeldf.groupby(c)['matkamittarilukema'].agg([np.median, np.mean]))

In [None]:
modeldf.groupby('ajoneuvonkaytto')['matkamittarilukema'].agg([np.median, np.mean, np.std])

In [None]:
modeldf.plot.scatter(x='usage_days', y='matkamittarilukema')

In [None]:
traindf = modeldf[numericVars]
y = traindf.pop('matkamittarilukema')
imputer = Imputer()
scaler = StandardScaler()
X_raw = imputer.fit_transform(traindf)
X = scaler.fit_transform(X_raw)
Xdf = pd.DataFrame(X, columns=traindf.columns)

In [None]:
tmp = pd.get_dummies(modeldf['ajoneuvonkaytto'], prefix='ajoneuvonkaytto')
Xdf[tmp.columns] = tmp

In [None]:
enet = ElasticNet(alpha=1.0, l1_ratio=0.5, fit_intercept=True, normalize=False, precompute=False,
                   max_iter=1000, copy_X=True, tol=0.0001, warm_start=False, positive=False, 
                   random_state=None, selection='cyclic')
enet.fit(Xdf, y)

In [None]:
pd.Series(enet.coef_, index=Xdf.columns)

In [None]:
modeldf['prediction'] = enet.predict(Xdf)
zz = np.array([0, modeldf.prediction.max()])
#fig, axes = plt.subplots()
modeldf.plot.scatter(x='prediction', y='matkamittarilukema')
plt.plot(zz, zz, 'r-')
plt.show()