In [None]:
pip install finta

In [1]:
from pyspark.sql import SparkSession
import findspark
import pandas as pd
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("raw-dataset")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

22/01/06 14:45:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df = spark.read.options(header='True',inferSchema='True',delimiter=',').csv('/home/jovyan/work/data/dataset/raw.csv')
df = df.drop(df[0]) #drop index column (first column)

                                                                                

In [4]:
##reanme column to match Finta API require
import pyspark.sql.functions as F
columns = {"Close": 'close', "High": 'high', "Low": 'low', 'Volume': 'volume', 'Open': 'open'}

def rename_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

df = rename_columns(df,columns)

In [5]:
from pyspark.sql.functions import to_timestamp
df = df.withColumn("Date",to_timestamp(df.Date))

In [6]:
def _get_indicator_grouped_data(key, group):
    from finta import TA
    INDICATORS = ['RSI', 'STOCH','ADL', 'ATR', 'MOM', 'MFI', 'ROC', 'OBV', 'CCI', 'EMV','WILLIAMS','ADX', 'TRIX']
    
    #df = pd.DataFrame()
    ind = pd.DataFrame()
    for indicator in INDICATORS:
        ind_data = eval('TA.' + indicator + '(group)')
        if not isinstance(ind_data,pd.DataFrame):
            ind_data = ind_data.to_frame()
            group = group.merge(ind_data, left_index=True, right_index=True)

    del (group['open'])
    del (group['high'])
    del (group['low'])
    del (group['volume'])
    del (group['Adj Close'])
    
    #uncomment 2 lines below to test return column
    #print(group.columns)
    #return pd.DataFrame([key]) 
    
    return pd.DataFrame(group.values)

#SCHEMA = "key string"  test schema
        

In [7]:
#Schema have to match with return dataframe. Check group.columns, some of indicators maybe return null 
SCHEMA = "Date date, close double, symbol string, 14_period_RSI double, \
14_period_STOCH_K double, MFV double, 14_period_ATR double, MOM double, 14_period_MFI double, \
ROC double, OBV double, 20_period_CCI double, 14_period_EMV double, Williams double, 14_period_ADX double, 20_period_TRIX double"


In [8]:
df_indi = df.groupBy("Symbol").applyInPandas(_get_indicator_grouped_data,schema=SCHEMA)

In [9]:
def _produce_prediction(group):
    """
    Function that produces the 'truth' values
    At a given row, it looks 'day' rows ahead to see if the price increased (1) or decreased (0)
    :param day: number of days, or rows to look ahead to see what the price did
    """
    day = [3,5,7,10]
    for d in day:
        prediction = (group.shift(-d)['close'] >= group['close'])
        prediction = prediction.iloc[:-d]
        group['pred_'+str(d)] = prediction.astype(int)
        group.dropna(inplace=True)
    
    return pd.DataFrame(group.values)

#data = _produce_prediction(group)
#del (data['close'])
#data = data.dropna() # Some indicators produce NaN values for the first few rows, we just remove them here
#data.tail()

In [10]:
#Schema have to match with return dataframe. Check group.columns, some of indicators maybe return null 
SCHEMA_PREDICTION = "Date date, close double, symbol string, 14_period_RSI double, \
14_period_STOCH_K double, MFV double, 14_period_ATR double, MOM double, 14_period_MFI double, \
ROC double, OBV double, 20_period_CCI double, 14_period_EMV double, Williams double, 14_period_ADX double, 20_period_TRIX double,\
pred_3 int, pred_5 int, pred_7 int, pred_10 int"

df_prediction = df_indi.groupBy("Symbol").applyInPandas(_produce_prediction,schema=SCHEMA_PREDICTION)

In [16]:
df_prediction.coalesce(1).write.csv ('/home/jovyan/work/data/dataset_final',header=True)

                                                                                