In [222]:
from pyspark import SparkContext,HiveContext
from pyspark.sql import SQLContext,SparkSession
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame

from pyspark.ml.feature import VectorAssembler,Imputer
from pyspark.ml.regression import GBTRegressor,LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import numpy as np
import pandas as pd

from itertools import combinations
from functools import reduce

from datetime import date,datetime
from dateutil.relativedelta import relativedelta as rd

from sklearn.model_selection import train_test_split

import asyncio

In [11]:
sqlContext = SQLContext(spark)

In [27]:
%%time
df = spark.read.parquet('/stocks_clean/*.parquet')

CPU times: user 763 µs, sys: 4.19 ms, total: 4.95 ms
Wall time: 2.02 s


In [28]:
df = df.withColumn('dif',df.high-df.low)
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- volume: double (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- stock: string (nullable = true)
 |-- dif: double (nullable = true)



In [29]:
varc = ['volume','open','high','low','close','dif']

In [30]:
df = df.filter(df['date']>=date(2011,1,1))

In [31]:
df.registerTempTable('acciones')

In [32]:
query = """
SELECT stock,date,count(*) as casos FROM 
acciones 
GROUP BY stock,date
having count(*)>1
"""

In [33]:
sqlContext.sql(query).show()

+-----+----+-----+
|stock|date|casos|
+-----+----+-----+
+-----+----+-----+



In [34]:
catfh = df.select('date').dropDuplicates().toPandas()
catfh['mes'] = catfh['date'].map(lambda x:x.strftime('%Y%m'))
aux = catfh[['mes']].drop_duplicates().sort_values(by='mes').reset_index(drop=True)
aux.insert(0,'id_mes',aux.index+1)
catfh = catfh.merge(aux,on='mes',how='inner')
catfh = catfh.loc[catfh['id_mes']<115].reset_index(drop=True)
catfh.tail()

Unnamed: 0,date,mes,id_mes
2384,2016-12-19,201612,72
2385,2016-12-21,201612,72
2386,2016-12-28,201612,72
2387,2016-12-07,201612,72
2388,2016-12-02,201612,72


In [35]:
catfh = spark.createDataFrame(catfh[['date','id_mes']])

In [36]:
df = df.join(catfh,['date'],'inner')

In [37]:
df.printSchema()


root
 |-- date: timestamp (nullable = true)
 |-- volume: double (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- stock: string (nullable = true)
 |-- dif: double (nullable = true)
 |-- id_mes: long (nullable = true)



In [38]:
vobs = 12
vdes = 1
step = 3


In [183]:
def ingX(df:DataFrame,ancla:int,sub:int)->DataFrame:
    aux = df.filter((df['id_mes']<=ancla)&(df['id_mes']>=(ancla-sub+1)))
    aux.registerTempTable('aux')
    query = f"""
                with defase as (
                  SELECT 
                    stock, 
                    date, 
                    close, 
                    lag(close,-1) over (
                      partition by stock 
                      order by 
                        date desc
                    ) as previa 
                  from 
                    aux 
                  order by 
                    stock, 
                    date desc
                ) 
                select 
                  stock, 
                  {ancla} as ancla, 
                  avg(
                    (close - previa)/ close
                  ) as x_incpct_prom_{sub}, 
                  avg(
                    case when close > previa then 1 else 0 end
                  ) as x_num_incprecio_{sub}, 
                  avg(
                    case when close < previa then 1 else 0 end
                  ) as x_num_deccprecio_{sub} 
                from 
                  defase 
                group by 
                  stock

            """ 
    piv = sqlContext.sql(query)
    for v1,v2 in combinations(varc,2):
        aux = aux.withColumn(f'ratio_{v1}_{v2}',aux[v1]/aux[v2])
    
    ratios = [v for v in aux.columns if v.startswith('ratio_')]    
    expr = [f(F.col(v)).alias(f'x_{n}_{v}_{sub}') for v in varc+ratios for f,n in zip([F.min,F.max,F.mean,F.stddev],['min','max','mean','std'])]
    aux =aux.groupBy('stock').agg(*expr)
    aux = aux.withColumn('ancla',F.lit(ancla))
    return aux.join(piv,['stock','ancla'],'inner')

In [103]:
anclai, anclaf = df.select('id_mes').dropDuplicates().toPandas().agg(['min','max']).T.values[0]
anclai, anclaf = anclai+vobs-1,anclaf-vdes
anclai, anclaf

(12, 113)

In [107]:
cruzar = lambda x,y: x.join(y,um,'outer')
apilar = lambda x,y: x.union(y)

In [110]:
um = ['stock','ancla']

In [184]:
async def repVect (ancla:int):
    print(f"generando X ({ancla})")
    aux = reduce(cruzar,map(lambda sub:ingX(df,ancla,sub),range(step,vobs+step,step)))
    ancla = "%03d"%ancla
    aux.write.parquet(f'/stocks_feature/X_{ancla}.parquet',mode='overwrite') 

In [185]:
await asyncio.gather(*map(repVect,range(anclai,anclai+5)))

generando X (12)
generando X (13)
generando X (14)
generando X (15)
generando X (16)


[None, None, None, None, None]

In [176]:
async def ingY(df:DataFrame,ancla:int)->DataFrame:
    print(f"generando Y ({ancla})")
    aux = df.filter((df['id_mes']>ancla)&(df['id_mes']<=(ancla+vdes))).select(*['stock','date','close','id_mes'])
    aux.registerTempTable('aux')
    query = f"""
            with max_fh as (
          select 
            stock, 
            max(date) as date 
          from 
            aux 
          group by 
            stock
        ) 
        select 
          stock, 
          {ancla} as ancla,
          close as cierre_sig_mes 
        from 
          aux 
          inner join max_fh using(stock, date)

        """
    ancla = "%03d"%ancla
    sqlContext.sql(query).write.parquet(f'/stocks_feature/Y_{ancla}.parquet',mode='overwrite') 

In [177]:
await asyncio.gather(*map(lambda ancla:ingY(df,ancla),range(anclai,anclai+5)))

generando Y (12)
generando Y (13)
generando Y (14)
generando Y (15)
generando Y (16)


[None, None, None, None, None]

In [190]:
best = ['x_max_high_3','x_min_ratio_volume_high_12','x_min_ratio_open_dif_9','x_min_ratio_high_low_9']

In [192]:
%%time 
X = spark.read.parquet('/stocks_feature/X_*.parquet')
y = spark.read.parquet('/stocks_feature/Y_*.parquet')
tad = X.join(y,um,'inner').select(*(um+best+['cierre_sig_mes']))
tad.printSchema()

root
 |-- stock: string (nullable = true)
 |-- ancla: integer (nullable = true)
 |-- x_max_high_3: double (nullable = true)
 |-- x_min_ratio_volume_high_12: double (nullable = true)
 |-- x_min_ratio_open_dif_9: double (nullable = true)
 |-- x_min_ratio_high_low_9: double (nullable = true)
 |-- cierre_sig_mes: double (nullable = true)

CPU times: user 10.4 ms, sys: 0 ns, total: 10.4 ms
Wall time: 654 ms


In [193]:
train,valid = train_test_split(tad.toPandas(),train_size=0.7)
train,valid = spark.createDataFrame(train),spark.createDataFrame(valid)

In [194]:
# Imputación
imp = Imputer(strategy='median',inputCols=best,outputCols=best)
imp = imp.fit(train)
Xi = imp.transform(train)

In [198]:
# Vectorización
assembler = VectorAssembler(inputCols=best,outputCol='vector')
v = assembler.transform(Xi)
v.printSchema()

In [223]:
# Modelo
mod = LinearRegression(featuresCol='vector',labelCol='cierre_sig_mes')

In [224]:
mod = mod.fit(v)

In [225]:
ev = RegressionEvaluator(labelCol='cierre_sig_mes',metricName='mae')

In [226]:
ev.evaluate(mod.transform(v))

110.30206997418219

In [227]:
ev.evaluate(mod.transform(assembler.transform(imp.transform(valid))))

74.04249293540894