In [1]:
# !pip install findspark

In [2]:
import findspark

findspark.init()


In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import from_json
from pyspark.sql.types import *


spark = SparkSession.builder \
          .appName('pre_process_media_data') \
          .getOrCreate()

# spark.conf.set("spark.sql.session.timeZone", "UTC")



Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/22 18:22:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:

schema = StructType([
StructField("title"                  , StringType()),   
StructField("url"                    , StringType()), 
StructField("time_published"         , StringType()),             
StructField("authors"                , StringType()),     
StructField("summary"                , StringType()),     
StructField("banner_image"           , StringType()),           
StructField("source"                 , StringType()),     
StructField("category_within_source" , StringType()),                     
StructField("source_domain"          , StringType()),           
StructField("topics"                 , StringType()),     
StructField("overall_sentiment_score", StringType()),                     
StructField("overall_sentiment_label", StringType()),                     
StructField("ticker_sentiment"       , StringType())                   
])

# Read csv files
df = spark.read \
        .option("header",True) \
        .option("multiline",True) \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .csv("./data/alpha_vantage/news_data/*/*", schema=schema)

df = df.select(F.col("url"),
       F.col("time_published"),
       F.col("ticker_sentiment"))

df = df.withColumn("filename", F.input_file_name())

df = df.withColumn('file_ticker', F.split(df['filename'], '/').getItem(9))

df.printSchema()

df.createOrReplaceTempView("df")

                                                                                

root
 |-- url: string (nullable = true)
 |-- time_published: string (nullable = true)
 |-- ticker_sentiment: string (nullable = true)
 |-- filename: string (nullable = false)
 |-- file_ticker: string (nullable = true)



In [5]:
# Check how many files we have per ticker
files_per_ticker_df = spark.sql("""
select  ROW_NUMBER() OVER (ORDER BY count(*) desc) AS ROWNUM,
        file_ticker, count(*)
from df
group by 2
order by 3 desc
""")

# +------+-----------+--------+
# |ROWNUM|file_ticker|count(1)|
# +------+-----------+--------+
# |1     |TSLA       |10227   |
# |2     |JPM        |9954    |
# |3     |AAPL       |9491    |
# |4     |NFLX       |8994    |
# |5     |BAC        |8634    |
# |6     |MSFT       |8617    |
# |7     |WFC        |7666    |
# |8     |META       |7402    |
# |9     |WMT        |7018    |
# |10    |AMZN       |6792    |
# |11    |PFE        |5708    |
# |12    |NVDA       |5684    |
# |13    |XOM        |3814    |
# |14    |NKE        |3670    |
# |15    |KO         |3376    |
# |16    |JNJ        |3171    |
# |17    |VZ         |2875    |
# |18    |MA         |2611    |
# |19    |COST       |2560    |
# |20    |DIS        |2472    |
# |21    |ABBV       |2430    |
# |22    |CVX        |2378    |
# |23    |HD         |2216    |
# |24    |PEP        |2180    |
# |25    |PG         |1812    |
# |26    |MCD        |1757    |
# |27    |CSCO       |1724    |
# |28    |UNH        |1681    |
# |29    |AVGO       |1649    |
# |30    |ACN        |1557    |
# |31    |V          |1375    |
# |32    |ADBE       |963     |
# |33    |ABT        |932     |
# |34    |TMO        |760     |
# |35    |PM         |722     |
# |36    |LLY        |703     |
# |37    |DHR        |443     |
# |38    |TXN        |434     |
# |39    |LIN        |213     |
# +------+-----------+--------+

files_per_ticker_df.createOrReplaceTempView('files_per_ticker_df')

In [6]:
schema = ArrayType(
    StructType([StructField("ticker", StringType()), 
                StructField("relevance_score", StringType()), 
                StructField("ticker_sentiment_score", StringType()), 
                StructField("ticker_sentiment_label", StringType())]))


df = df.withColumn("ticker_sentiment_new", from_json(F.col("ticker_sentiment"), schema))

df = df.withColumn("ticker_sentiment_item",F.explode(F.col("ticker_sentiment_new")))

df = df.withColumn("ticker", F.col("ticker_sentiment_item").getItem("ticker"))

df = df.withColumn("relevance_score", F.col("ticker_sentiment_item").getItem("relevance_score"))

df = df.withColumn("ticker_sentiment_score", F.col("ticker_sentiment_item").getItem("ticker_sentiment_score"))

df = df.withColumn("ticker_sentiment_label", F.col("ticker_sentiment_item").getItem("ticker_sentiment_label"))

df = df.select(F.col("url"),
               F.col("time_published"),
               F.col("ticker"))

df.printSchema()
df.createOrReplaceTempView('df')

root
 |-- url: string (nullable = true)
 |-- time_published: string (nullable = true)
 |-- ticker: string (nullable = true)



In [7]:
ticker_count_per_article_df = spark.sql("""
with top_tickers
as 
(
    select * 
    from files_per_ticker_df
    where ROWNUM<36
),
s1
as
(
    select url,
           ticker,
           min(time_published) as time_published     
    from   df
    group by 1,2
)

select s1.url,
       s1.time_published,
       count(s1.ticker) as ticker_count_per_article
from   s1 join top_tickers
       on s1.ticker = top_tickers.file_ticker 
group by 1,2

""")

ticker_count_per_article_df.createOrReplaceTempView("ticker_count_per_article_df")

ticker_count_per_article_df.printSchema()

root
 |-- url: string (nullable = true)
 |-- time_published: string (nullable = true)
 |-- ticker_count_per_article: long (nullable = false)



In [8]:
# This column capture the year of the media article been published.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_year", 
                                                                     F.col("time_published")[0:4])

# This column capture the month of the media article been published.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_month", 
                                                                     F.col("time_published")[5:2])

# This column capture the day (in the month) of the media article been published.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_day", 
                                                                     F.col("time_published")[7:2])

# This column capture the hour of the media article been published.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_hour", 
                                                                     F.col("time_published")[10:2])

# This column capture the minute of the media article been published.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_min", 
                                                                     F.col("time_published")[12:2])

# This column capture the second of the media article been published.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_sec",
                                                                     F.col("time_published")[14:2])

# This column capture the date of the media article.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_date", 
                                                                     F.concat_ws('-',
                                                                                  F.col("t_article_published_year"),
                                                                                  F.col("t_article_published_month"),
                                                                                  F.col("t_article_published_day")
                                                                                )
                                                                    )

# This column capture the time of the media article.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_time", 
                                                                     F.concat_ws(':',
                                                                                  F.col("t_article_published_hour"),
                                                                                  F.col("t_article_published_min"),
                                                                                  F.col("t_article_published_sec")
                                                                                )
                                                                    )

# This column capture the timestamp of the media article.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_ts", 
                                                                     F.to_timestamp(
                                                                         F.concat_ws(' ',
                                                                                     F.col("t_article_published_date"),
                                                                                     F.col("t_article_published_time")
                                                                                    )
                                                                                   )
                                                                    )

# This column capture the time and date of the media article in epoch time.
ticker_count_per_article_df = ticker_count_per_article_df.withColumn("t_article_published_epoch", 
                                                                     F.unix_timestamp("t_article_published_ts"))


ticker_count_per_article_df = ticker_count_per_article_df.select(F.col("url"),
                                                                 F.col("t_article_published_date"),        
                                                                 F.col("t_article_published_ts"),
                                                                 F.col("t_article_published_epoch"),                                                                 
                                                                 F.col("ticker_count_per_article")
                                                                )

ticker_count_per_article_df.createOrReplaceTempView("ticker_count_per_article_df")

ticker_count_per_article_df.printSchema()


root
 |-- url: string (nullable = true)
 |-- t_article_published_date: string (nullable = false)
 |-- t_article_published_ts: timestamp (nullable = true)
 |-- t_article_published_epoch: long (nullable = true)
 |-- ticker_count_per_article: long (nullable = false)



In [9]:
# Adding for each time resolution (1,5,10,15,30 minute) the proper key (round up epoch key)

before_agg_df = spark.sql("""

with round_epoch
as
(
select url,
       t_article_published_ts,
       t_article_published_epoch,
       cast((t_article_published_epoch/60) as int)*60                 as  t_nearest_min_epoch,
       cast((t_article_published_epoch/(60*5)) as int)*(60*5)         as  t_nearest_5min_epoch,
       cast((t_article_published_epoch/(60*10)) as int)*(60*10)       as  t_nearest_10min_epoch,
       cast((t_article_published_epoch/(60*15)) as int)*(60*15)       as  t_nearest_15min_epoch,
       cast((t_article_published_epoch/(60*30)) as int)*(60*30)       as  t_nearest_30min_epoch,
       ticker_count_per_article          
from ticker_count_per_article_df

)
select url                                                as url,
       t_nearest_min_epoch                                as t_round_up_min_epoch,
       case 
            when t_nearest_5min_epoch<t_nearest_min_epoch 
                then t_nearest_5min_epoch+(60*5)
            else t_nearest_5min_epoch
       end                                                as t_round_up_5min_epoch,
       case 
            when t_nearest_10min_epoch<t_nearest_min_epoch 
                then t_nearest_10min_epoch+(60*10)
            else t_nearest_10min_epoch
       end                                                as t_round_up_10min_epoch,
       case 
            when t_nearest_15min_epoch<t_nearest_min_epoch 
                then t_nearest_15min_epoch+(60*15)
            else t_nearest_15min_epoch
       end                                                as t_round_up_15min_epoch,
       case 
            when t_nearest_30min_epoch<t_nearest_min_epoch 
                then t_nearest_30min_epoch+(60*30)
            else t_nearest_30min_epoch
       end                                                as t_round_up_30min_epoch,
       ticker_count_per_article                           as ticker_count_per_article
from round_epoch

""")

before_agg_df.createOrReplaceTempView("before_agg_df")

In [10]:
before_agg_df.printSchema()

root
 |-- url: string (nullable = true)
 |-- t_round_up_min_epoch: integer (nullable = true)
 |-- t_round_up_5min_epoch: integer (nullable = true)
 |-- t_round_up_10min_epoch: integer (nullable = true)
 |-- t_round_up_15min_epoch: integer (nullable = true)
 |-- t_round_up_30min_epoch: integer (nullable = true)
 |-- ticker_count_per_article: long (nullable = false)



In [11]:
# Agregate data for 1 min resolution data sets.

agg_1min_data = spark.sql("""

select t_round_up_min_epoch          as epoch_key, 
       sum(ticker_count_per_article) as snp_ticker_count_epoch
from before_agg_df
group by 1

""")

agg_1min_data.printSchema()

agg_1min_data.summary().show()


root
 |-- epoch_key: integer (nullable = true)
 |-- snp_ticker_count_epoch: long (nullable = true)



23/04/22 18:22:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:22:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:22:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:22:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+-------+-------------------+----------------------+
|summary|          epoch_key|snp_ticker_count_epoch|
+-------+-------------------+----------------------+
|  count|              75243|                 75243|
|   mean|1.661516264525338E9|     2.073774304586473|
| stddev|  8742703.295124054|    2.1689905026486476|
|    min|         1646114400|                     1|
|    25%|         1653905700|                     1|
|    50%|         1661398320|                     1|
|    75%|         1668690960|                     2|
|    max|         1677437460|                    86|
+-------+-------------------+----------------------+



In [12]:
# Agregate data for 5 min resolution data sets.

agg_5min_data = spark.sql("""

select t_round_up_5min_epoch          as epoch_key, 
       sum(ticker_count_per_article) as snp_ticker_count_epoch
from before_agg_df
group by 1

""")

agg_5min_data.printSchema()

agg_5min_data.summary().show()


root
 |-- epoch_key: integer (nullable = true)
 |-- snp_ticker_count_epoch: long (nullable = true)



23/04/22 18:22:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 43:>                                                       (0 + 10) / 10]

+-------+-------------------+----------------------+
|summary|          epoch_key|snp_ticker_count_epoch|
+-------+-------------------+----------------------+
|  count|              48436|                 48436|
|   mean|1.661711178003964E9|    3.2215087951110744|
| stddev|   8807269.59170083|    3.2075399973287673|
|    min|         1646114400|                     1|
|    25%|         1654087200|                     1|
|    50%|         1661675400|                     2|
|    75%|         1669105500|                     4|
|    max|         1677437700|                    87|
+-------+-------------------+----------------------+



                                                                                

In [13]:
# Agregate data for 10 min resolution data sets.

agg_10min_data = spark.sql("""

select t_round_up_10min_epoch          as epoch_key, 
       sum(ticker_count_per_article) as snp_ticker_count_epoch
from before_agg_df
group by 1

""")

agg_10min_data.printSchema()

agg_10min_data.summary().show()


root
 |-- epoch_key: integer (nullable = true)
 |-- snp_ticker_count_epoch: long (nullable = true)



23/04/22 18:23:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 70:>                                                       (0 + 10) / 10]

+-------+--------------------+----------------------+
|summary|           epoch_key|snp_ticker_count_epoch|
+-------+--------------------+----------------------+
|  count|               33038|                 33038|
|   mean|1.6618145357103941E9|     4.722955384708517|
| stddev|   8852893.345211979|    4.5157458798135615|
|    min|          1646114400|                     1|
|    25%|          1654152600|                     2|
|    50%|          1661841600|                     3|
|    75%|          1669256400|                     6|
|    max|          1677438000|                    87|
+-------+--------------------+----------------------+



                                                                                

In [14]:
# Agregate data for 15 min resolution data sets.

agg_15min_data = spark.sql("""

select t_round_up_15min_epoch          as epoch_key, 
       sum(ticker_count_per_article) as snp_ticker_count_epoch
from before_agg_df
group by 1

""")

agg_15min_data.printSchema()

agg_15min_data.summary().show()


root
 |-- epoch_key: integer (nullable = true)
 |-- snp_ticker_count_epoch: long (nullable = true)



23/04/22 18:23:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 97:>                                                       (0 + 10) / 10]

+-------+--------------------+----------------------+
|summary|           epoch_key|snp_ticker_count_epoch|
+-------+--------------------+----------------------+
|  count|               25093|                 25093|
|   mean|1.6618471986769218E9|     6.218347746383453|
| stddev|   8881403.978248285|    5.8359004827999685|
|    min|          1646114400|                     1|
|    25%|          1654159500|                     2|
|    50%|          1661882400|                     4|
|    75%|          1669359600|                     9|
|    max|          1677438000|                    98|
+-------+--------------------+----------------------+



                                                                                

In [15]:
# Agregate data for 30 min resolution data sets.

agg_30min_data = spark.sql("""

select t_round_up_30min_epoch          as epoch_key, 
       sum(ticker_count_per_article) as snp_ticker_count_epoch
from before_agg_df
group by 1

""")

agg_30min_data.printSchema()

agg_30min_data.summary().show()


root
 |-- epoch_key: integer (nullable = true)
 |-- snp_ticker_count_epoch: long (nullable = true)



23/04/22 18:23:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:23:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+-------+--------------------+----------------------+
|summary|           epoch_key|snp_ticker_count_epoch|
+-------+--------------------+----------------------+
|  count|               14729|                 14729|
|   mean|1.6618985156765563E9|     10.59386244823138|
| stddev|   8926061.065160422|      9.70017137121795|
|    min|          1646114400|                     1|
|    25%|          1654149600|                     3|
|    50%|          1661970600|                     7|
|    75%|          1669554000|                    16|
|    max|          1677438000|                   109|
+-------+--------------------+----------------------+



In [18]:

# write data set into DWH
agg_1min_data.write.parquet("/Users/ybatash/Workspace/tau.ac.il/tau-final-project/data/DWH/media_aggregated_data/interval=1min/")

agg_5min_data.write.parquet("/Users/ybatash/Workspace/tau.ac.il/tau-final-project/data/DWH/media_aggregated_data/interval=5min/")

agg_10min_data.write.parquet("/Users/ybatash/Workspace/tau.ac.il/tau-final-project/data/DWH/media_aggregated_data/interval=10min/")

agg_15min_data.write.parquet("/Users/ybatash/Workspace/tau.ac.il/tau-final-project/data/DWH/media_aggregated_data/interval=15min/")

agg_30min_data.write.parquet("/Users/ybatash/Workspace/tau.ac.il/tau-final-project/data/DWH/media_aggregated_data/interval=30min/")


23/04/22 18:25:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:25:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:25:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:25:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:25:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 18:25:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/04/22 1