In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.functions import vector_to_array

In [0]:
date = spark.read.table('herhackathon.date_table')
tweets = spark.read.table('herhackathon.tweets').withColumnRenamed("tweet", "body")
reddits = spark.read.table('herhackathon.reddits')
company_values = spark.read.table('herhackathon.company_values')
companies = spark.read.table('herhackathon.tickers_processed_kaja')
market = spark.read.table('herhackathon.market_processed')

In [0]:
# generate id column for reddits and prefix both id columns to create unique IDs
reddits = reddits.withColumn("id", F.concat(F.lit("r_"), F.col("index")))
tweets = tweets.withColumn("id", F.concat(F.lit("t_"), F.col("id")))

In [0]:
date = date.drop("index")
reddits = reddits.drop("index")
companies = companies.drop("index")
market = market.drop("index")

### Drop rows with null values in important columns

In [0]:
tweets = tweets.dropna(subset=["writer", "body"])
reddits = reddits.dropna(subset=["title"])

### Drop duplicates

In [0]:
tweets = tweets.drop_duplicates()
reddits = reddits.drop_duplicates()

### Process date and timestamp columns

In [0]:
company_values = company_values.withColumn("day_date", F.col("day_date").cast("date"))
market = market.withColumn("date", F.col("date").cast("date"))

In [0]:
tweets = tweets.withColumn("date", F.substring("created_at", 0, 19))
tweets = tweets.withColumn("timestamp", F.to_timestamp('date', "yyyy-MM-dd HH:mm:ss")).drop("created_at")
tweets = tweets.withColumn("date", F.to_date("timestamp"))

reddits = reddits.withColumn("timestamp", F.to_timestamp('date'))
reddits = reddits.withColumn("date", F.to_date('timestamp'))

### Compute impact factors

In [0]:
# add impact factor to tweets
twitter_coefficients = {
  "likes": 1,
  "comments": 5,
  "retweets": 10,
}

tweets = tweets.fillna(0, subset=["like_num", "comment_num", "retweet_num"])

tweets = (
  tweets
  .withColumn("impact_factor", 
              twitter_coefficients["likes"] * F.col("like_num") +
              twitter_coefficients["comments"] * F.col("comment_num") +
              twitter_coefficients["retweets"] * F.col("retweet_num")
             )
)

In [0]:
# add impact factor to reddits
reddit_coefficients = {
  "likes": 1,
  "comments": 5,
}

reddits = (
  reddits
  .withColumn("impact_factor", 
              reddit_coefficients["likes"] * F.col("score") +
              reddit_coefficients["comments"] * F.col("num_comments")
             )
)

### Rescale impact factor of both tables to the same range

In [0]:
def rescale_impact_factor(df):
  assembler = VectorAssembler(
      inputCols=["impact_factor"],
      outputCol="impact_factor_vec")

  scaler = StandardScaler(inputCol="impact_factor_vec", outputCol="impact_factor_scaled",
                      withStd=True, withMean=False)

  assembled_df = assembler.transform(df)

  scaler_model = scaler.fit(assembled_df)
  scaled_df = scaler_model.transform(assembled_df)

  output = scaled_df.withColumn("impact_factor_scaled_array", vector_to_array("impact_factor_scaled"))

  output = output.withColumn("impact_factor", F.explode("impact_factor_scaled_array"))

  return output.drop("impact_factor_vec", "impact_factor_scaled", "impact_factor_scaled_array")

In [0]:
tweets = rescale_impact_factor(tweets)
reddits = rescale_impact_factor(reddits)

### Misc

In [0]:
reddits = reddits.withColumn("content", F.concat_ws(" ", "title", "body"))

# remove newline characters from reddit contents for easier load in PowerBI
reddits = reddits.withColumn("content", F.regexp_replace(F.col("content"), "\n", " "))
reddits = reddits.withColumn("title", F.regexp_replace(F.col("title"), "\n", " "))
tweets = tweets.withColumn("body", F.regexp_replace(F.col("body"), "\n", " "))
tweets = tweets.withColumn("body", F.regexp_replace(F.col("body"), "\t", " "))

tweets = tweets.filter(F.length(F.col("tweet")) > 3)
reddits = reddits.filter(F.length(F.col("content")) > 3)

tweets = tweets.withColumnRenamed("tweet", "body")

### Write output dataframes

In [0]:
def save_dataframe(df, name):
  (
    df
    .coalesce(1)
    .write
    .format('csv')
    .option("header", True)
    .option("sep", "\t")
    .mode("overwrite")
    .save(f"/mnt/herhackathon/processed/{name}.csv")
  )

In [0]:
save_dataframe(date, "date")
save_dataframe(company_values, "company_values")
save_dataframe(companies, "companies")
save_dataframe(market, "market")
save_dataframe(tweets, "tweets")
save_dataframe(reddits, "reddits")