In [1]:
import findspark
findspark.find()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [2]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext("local", "Simple App")
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [3]:
df = sqlContext.read.parquet("day_data_data2018-12-15 00_00_00.parquet")
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
# df = df.withColumnRenamed("stock_price_col", "label")
df.limit(5).toPandas().head()

Unnamed: 0,index,text,tweet_count,date_col,stock_price_col
0,3106,"RT @VICE: How I quit Apple, Microsoft, Google,...",209,2018-12-13 19:01:00,1657.76
1,3107,"RT @JessSFrankel: ""How about I light your lamp...",187,2018-12-13 19:02:00,1657.76
2,3108,RT @DeepStateExpose: Grab a copy of my NEW gro...,169,2018-12-13 19:03:00,1657.76
3,3109,RT @cloutboyjojoo: UPS: your package arrived i...,226,2018-12-13 19:04:00,1657.76
4,3110,RT @cloutboyjojoo: UPS: your package arrived i...,221,2018-12-13 19:05:00,1657.76


In [5]:
from pyspark.sql.types import IntegerType

df = df.limit(100)
# df = df.withColumn("tweet_count", df["tweet_count"].cast(IntegerType()))
df.printSchema()

root
 |-- index: long (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_count: long (nullable = true)
 |-- date_col: timestamp (nullable = true)
 |-- stock_price_col: double (nullable = true)



In [6]:
import re

pat1 = r'@[A-Za-z0-9_]+'
pat2 = r'https?://[^ ]+'
combined_pat = r'|'.join((pat1,pat2))
www_pat = r'www.[^ ]+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')

def pre_processing(column):
    first_process = re.sub(r'|'.join((pat1,pat2)), '', column)
    second_process = re.sub(www_pat, '', first_process)
    third_process = second_process.lower()
    fourth_process = neg_pattern.sub(lambda x: negations_dic[x.group()], third_process)
    result = re.sub(r'[^A-Za-z ]','',second_process)
    return result.strip()

In [7]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

reg_replaceUdf = f.udf(pre_processing, StringType())

In [8]:
df.withColumn("tweet", reg_replaceUdf(df.text))

DataFrame[index: bigint, text: string, tweet_count: bigint, date_col: timestamp, stock_price_col: double, tweet: string]

In [9]:
from pyspark.ml.feature import Tokenizer, NGram, CountVectorizer, IDF, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

In [10]:
input_cols = ["1_tfidf", "2_tfidf", "3_tfidf", "4_tfidf", "5_tfidf", "ss_tweet_count"]

def build_pipeline():
    tokenizer = [Tokenizer(inputCol='text',outputCol='words')]
    ngrams = [NGram(n=i, inputCol='words', outputCol='{0}_grams'.format(i)) for i in range(1,6)]
    cv = [CountVectorizer(vocabSize=100000, inputCol='{0}_grams'.format(i), outputCol='{0}_tf'.format(i)) for i in range(1,6)]
    idf = [IDF(inputCol='{0}_tf'.format(i), outputCol='{0}_tfidf'.format(i), minDocFreq=5) for i in range(1,6)]
    tweetvect = [VectorAssembler(inputCols=["tweet_count"], outputCol="vec_tweet_count")]
    ss = [StandardScaler(inputCol="vec_tweet_count", outputCol="ss_tweet_count")]
    assembler = [VectorAssembler(inputCols=input_cols, outputCol='features')]
    pipeline = Pipeline(stages=tokenizer+ngrams+cv+idf+tweetvect+ss+assembler)
    return pipeline

In [11]:
pipeline = build_pipeline()
pipelineFit = pipeline.fit(df)

In [12]:
df = pipelineFit.transform(df)
df.limit(5).toPandas().head()

Unnamed: 0,index,text,tweet_count,date_col,stock_price_col,words,1_grams,2_grams,3_grams,4_grams,...,4_tf,5_tf,1_tfidf,2_tfidf,3_tfidf,4_tfidf,5_tfidf,vec_tweet_count,ss_tweet_count,features
0,3106,"RT @VICE: How I quit Apple, Microsoft, Google,...",209,2018-12-13 19:01:00,1657.76,"[rt, @vice:, how, i, quit, apple,, microsoft,,...","[rt, @vice:, how, i, quit, apple,, microsoft,,...","[rt @vice:, @vice: how, how i, i quit, quit ap...","[rt @vice: how, @vice: how i, how i quit, i qu...","[rt @vice: how i, @vice: how i quit, how i qui...",...,"(0.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, ...","(0.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0404095383378767, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",[209.0],[8.196391015478076],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,3107,"RT @JessSFrankel: ""How about I light your lamp...",187,2018-12-13 19:02:00,1657.76,"[rt, @jesssfrankel:, ""how, about, i, light, yo...","[rt, @jesssfrankel:, ""how, about, i, light, yo...","[rt @jesssfrankel:, @jesssfrankel: ""how, ""how ...","[rt @jesssfrankel: ""how, @jesssfrankel: ""how a...","[rt @jesssfrankel: ""how about, @jesssfrankel: ...",...,"(4.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, ...","(1.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.3636858450408903, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.2442216945483588, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.4254657748148339, 0.0, 0.0, 0.0, 0.0, 0.0, ...",[187.0],[7.3336130138488045],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,3108,RT @DeepStateExpose: Grab a copy of my NEW gro...,169,2018-12-13 19:03:00,1657.76,"[rt, @deepstateexpose:, grab, a, copy, of, my,...","[rt, @deepstateexpose:, grab, a, copy, of, my,...","[rt @deepstateexpose:, @deepstateexpose: grab,...","[rt @deepstateexpose: grab, @deepstateexpose: ...","[rt @deepstateexpose: grab a, @deepstateexpose...",...,"(10.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0,...","(8.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.5253239983923971, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.110554236370897, 0.0, 0.0, 0.0, 0.0, 0.0, 0...","(3.403726198518671, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",[169.0],[6.627703739788492],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,3109,RT @cloutboyjojoo: UPS: your package arrived i...,226,2018-12-13 19:04:00,1657.76,"[rt, @cloutboyjojoo:, ups:, your, package, arr...","[rt, @cloutboyjojoo:, ups:, your, package, arr...","[rt @cloutboyjojoo:, @cloutboyjojoo: ups:, ups...","[rt @cloutboyjojoo: ups:, @cloutboyjojoo: ups:...","[rt @cloutboyjojoo: ups: your, @cloutboyjojoo:...",...,"(12.0, 17.0, 17.0, 17.0, 17.0, 17.0, 17.0, 17....","(8.0, 17.0, 17.0, 17.0, 17.0, 17.0, 17.0, 17.0...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.7273716900817806, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.7326650836450765, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.403726198518671, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",[226.0],[8.86308310764615],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,3110,RT @cloutboyjojoo: UPS: your package arrived i...,221,2018-12-13 19:05:00,1657.76,"[rt, @cloutboyjojoo:, ups:, your, package, arr...","[rt, @cloutboyjojoo:, ups:, your, package, arr...","[rt @cloutboyjojoo:, @cloutboyjojoo: ups:, ups...","[rt @cloutboyjojoo: ups:, @cloutboyjojoo: ups:...","[rt @cloutboyjojoo: ups: your, @cloutboyjojoo:...",...,"(9.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0...","(6.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.686962151743904, 0.0, 0.0, 0.0, 0.0, 0.0, 0...","(2.799498812733807, 0.0, 0.0, 0.0, 0.0, 0.0, 0...","(2.5527946488890034, 0.0, 0.0, 0.0, 0.0, 0.0, ...",[221.0],[8.666997198184951],"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [13]:
select_list = ["date_col", "features", "stock_price_col"]
df = df.select([column for column in df.columns if column in select_list])
df.limit(5).toPandas().head()

Unnamed: 0,date_col,stock_price_col,features
0,2018-12-13 19:01:00,1657.76,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,2018-12-13 19:02:00,1657.76,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,2018-12-13 19:03:00,1657.76,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,2018-12-13 19:04:00,1657.76,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,2018-12-13 19:05:00,1657.76,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
