In [1]:
from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from numpy import polyfit


conf = SparkConf().setMaster('local').setAppName('ML_learning')
sc = SparkContext(conf=conf)
sqlcontext = SQLContext(sc)

In [2]:
fname = '27-1'
data = sqlcontext.read.csv(path=fname+'.csv', header = True, inferSchema = True)
 

In [3]:
def manipulateData(colnum):
    data2 = data.select(data.columns[1],data.columns[colnum+1])
    assembler = VectorAssembler().setInputCols(['daynum',]).setOutputCol('features')
    train01 = assembler.transform(data2)
    train02 = train01.select('features',str(colnum))
    train02 = train02.withColumnRenamed(str(colnum) ,'label')
    
    return train02

In [4]:
n = len(data.columns)

In [5]:
traindatapercity = [0]*(n-2)
for i in range(len(traindatapercity)):
    traindatapercity[i] = manipulateData(i+1)

In [6]:
traindatapercity[0].show()

+--------+-----+
|features|label|
+--------+-----+
|   [1.0]|  172|
|   [2.0]|  206|
|   [3.0]|  177|
|   [4.0]|  191|
|   [5.0]|  192|
|   [6.0]|  177|
|   [7.0]|  182|
+--------+-----+



In [7]:
lr = [0]*(n-2)
lrmodelpercity = [0]*(n-2)
for i in range(n-2): 
    lr = LinearRegression()
    lrmodelpercity[i] = lr.fit(traindatapercity[i])


In [12]:
li = [i for i in range(-3,3)]
rdd1 = sc.parallelize(li)
row_rdd = rdd1.map(lambda x: Row(x))
predictinput=sqlcontext.createDataFrame(row_rdd,['numbers'])

In [13]:
#predictinput.show()

In [14]:
assembler = VectorAssembler().setInputCols(['numbers',]).setOutputCol('features')
predictinput2 = assembler.transform(predictinput)
#predictinput2.show(5)
predictinput3 = predictinput2.select('features')
predictinput3.show(5)

+--------+
|features|
+--------+
|  [-1.0]|
+--------+



In [11]:

for i in range(0,4):
    predtable = predictinput3
    x = 100
    if i == 3:
        x = n-2-300
    for j in range(0,x):
        predict = lrmodelpercity[i*100+j].transform(predictinput3)
        predict2 = predict.select(col('prediction').alias(str(i*100+j+1)))
        predict2=predict2.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
        predtable=predtable.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
        predtable = predtable.join(predict2, on=["row_index"]).drop("row_index")
    temppd = predtable.toPandas()
    name = fname+'-'+'predictions'+str(i+1)+'.csv'
    temppd.to_csv(name)