In [26]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType,
                               IntegerType, BooleanType, TimestampType,
                              ArrayType, MapType, DateType)

from google.cloud import bigquery
from pyspark.sql.window import Window

spark = SparkSession \
        .builder.appName("DimNews") \
        .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.0") \
        .getOrCreate()

#load news DF
news_Df = spark \
        .read.option("recursiveFileLookup", "true") \
        .option("header", "true") \
        .parquet("gs://stocks-pipeline/raw-data/sock_news")


news_Df = news_Df.withColumn("newsKey", f.row_number().over(Window.orderBy('title')))
news_Df=news_Df.dropDuplicates(["id"])

news_Df = news_Df.drop("ticket")
news_Df = news_Df.withColumn('published_utc', f.split(news_Df['published_utc'], 'T').getItem(0))
news_Df = news_Df.withColumnRenamed("published_utc","PublishedUtc") \
            .withColumnRenamed("amp_url", "AmpUrl") \
            .withColumnRenamed("article_url", "ArticleUrl") \
            .withColumnRenamed("image_url", "ImageUrl") \
            .withColumnRenamed("id", "ArticleId")
            
news_Df = news_Df.withColumn("PublishedUtc",f.col("PublishedUtc").cast(DateType()))
news_Df = news_Df.fillna(value = 'Unknown', subset = ["AmpUrl"])

persist_df = news_Df.persist()

# persist_df = persist_df.drop("tickers")
persist_df = persist_df.drop("keywords")

print(persist_df.show())
    
    




23/03/17 20:12:23 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/03/17 20:12:23 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/03/17 20:12:23 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/03/17 20:12:23 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/03/17 20:12:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+-------+
|              AmpUrl|          ArticleUrl|              author|         description|           ArticleId|            ImageUrl|PublishedUtc|           publisher|             tickers|               title|newsKey|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+-------+
|https://www.zacks...|https://www.zacks...|Zacks Equity Rese...|The heavy selling...|--1KkCiSDNBqSQsAG...|https://staticx-t...|  2021-12-30|{name -> Zacks In...|              [AXLA]|Down 24% in 4 Wee...| 666825|
|https://www.zacks...|https://www.zacks...|Indrajit Bandyopa...|Here we discuss S...|--1z5c4ZCYWJOZ0jJ...|https://staticx-t...|  2022-11-28|{name -> Zac

                                                                                

In [19]:
print(persist_df.count())
# print(persist_df.select("id").distinct().count())

317760


In [31]:
sd=persist_df.filter("newsKey = 19301").select("tickers").show()
print(sd)

+--------------------+
|             tickers|
+--------------------+
|[SMTC, AI, ASAN, ...|
+--------------------+

None


In [21]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


chunk_size = 50000
i=0
window = Window.orderBy('newsKey')
persist_Df = persist_df.withColumn("row_idx", row_number().over(window))


# Loop through the dataframe in chunks
while i < persist_Df.count():
    
    # Select the chunk of data
    chunk=persist_Df.orderBy('newsKey').filter(persist_Df["row_idx"] >= i).take(chunk_size)

    # create a BigQuery client and dataset reference
    client = bigquery.Client(project='noted-span-377814')
    dataset_ref = client.dataset('Stocks_DW')

    # create a BigQuery table and upload the data
    table_ref = dataset_ref.table('DimNews')
    chunk_df = spark.createDataFrame(chunk).drop("row_idx")
    job_config = bigquery.LoadJobConfig(write_disposition='WRITE_APPEND')
    job = client.load_table_from_dataframe(chunk_df.toPandas(), table_ref, job_config=job_config)
    i+=chunk_size
    
    print(job.result())

23/03/17 19:47:07 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:07 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:07 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:13 WARN org.apache.spark.scheduler.TaskSetManager: Stage 25 contains a task of very large size (5260 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb802eeee0>


23/03/17 19:47:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:35 WARN org.apache.spark.scheduler.TaskSetManager: Stage 30 contains a task of very large size (5631 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb8a2615e0>


23/03/17 19:47:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:47:57 WARN org.apache.spark.scheduler.TaskSetManager: Stage 35 contains a task of very large size (6036 KiB). The maximum recommended task size is 1000 KiB.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb78637be0>


23/03/17 19:48:13 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:48:13 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:48:13 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:48:19 WARN org.apache.spark.scheduler.TaskSetManager: Stage 40 contains a task of very large size (5143 KiB). The maximum recommended task size is 1000 KiB.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb7883b940>


23/03/17 19:48:35 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:48:35 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:48:35 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:48:40 WARN org.apache.spark.scheduler.TaskSetManager: Stage 45 contains a task of very large size (5833 KiB). The maximum recommended task size is 1000 KiB.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb8a11c580>


23/03/17 19:49:02 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:49:02 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:49:02 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:49:08 WARN org.apache.spark.scheduler.TaskSetManager: Stage 50 contains a task of very large size (5139 KiB). The maximum recommended task size is 1000 KiB.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdbc70751f0>


23/03/17 19:49:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:49:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:49:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:49:32 WARN org.apache.spark.scheduler.TaskSetManager: Stage 55 contains a task of very large size (1533 KiB). The maximum recommended task size is 1000 KiB.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb77822fd0>


In [32]:
spark.stop()

In [4]:
5655902, 2128828

(5655902, 2128828)

In [25]:

# fill bridge table

bridge_Df_news = news_Df.select("newsKey","tickers").withColumn("tickers_exploded", f.explode("tickers")).drop("tickers")

# load tickers DF
ticker_Df = spark.read.format("parquet") \
            .option("header", "true") \
            .load("gs://stocks-pipeline/raw-data/ticker_details/*.parquet")

ticker_Df = ticker_Df.withColumn("tickerKey", f.row_number().over(Window.orderBy('ticker')))
bridge_Df_ticker = ticker_Df.select("tickerKey", "ticker")

# join bridge_Df_ticker and bridge_Df_news
joined_Df = bridge_Df_ticker \
            .join(bridge_Df_news, bridge_Df_ticker.ticker == bridge_Df_news.tickers_exploded, "inner") \
            .select("tickerKey", "newsKey")

joined_Df = joined_Df.distinct()
persist_Df = joined_Df.persist()

print(persist_Df.show())

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


chunk_size = 70000
i=0
window = Window.orderBy('newsKey')
persist_Df = persist_Df.withColumn("row_idx", row_number().over(window))


# Loop through the dataframe in chunks
while i < persist_Df.count():
    # Select the chunk of data
    chunk=persist_Df.orderBy('newsKey').filter(persist_Df["row_idx"] >= i).take(chunk_size)

    # create a BigQuery client and dataset reference
    client = bigquery.Client(project='noted-span-377814')
    dataset_ref = client.dataset('Stocks_DW')

    # create a BigQuery table and upload the data
    table_ref = dataset_ref.table('BrgNewsTicker')
    chunk_df = spark.createDataFrame(chunk).select('tickerKey','newsKey')
    job_config = bigquery.LoadJobConfig(write_disposition='WRITE_APPEND')
    job = client.load_table_from_dataframe(chunk_df.toPandas(), table_ref, job_config=job_config)
    i+=chunk_size
    print(job.result())
    
    
spark.stop()

23/03/17 19:59:49 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 19:59:49 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+---------+-------+
|tickerKey|newsKey|
+---------+-------+
|      966| 666825|
|     5571|  67088|
|    10100|  67088|
|     9984|  67088|
|     8643|1512812|
|     2319|1949643|
|     5936|1949643|
|     5150|1949643|
|     8282|2070274|
|     5049| 885095|
|     3347| 733447|
|    10320|1661784|
|     2943| 611618|
|     4520|1515495|
|       29|1218142|
|    10781|1218142|
|    11449|1218142|
|     6670|1218142|
|     9616|1605630|
|     3961|1892782|
+---------+-------+
only showing top 20 rows

None


23/03/17 20:01:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:30 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb808393d0>


23/03/17 20:01:39 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:39 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:39 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb71a09130>


23/03/17 20:01:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb897c0c40>


23/03/17 20:01:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:51 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb813cbaf0>


23/03/17 20:01:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:01:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb73a2d130>


23/03/17 20:02:01 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:01 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:01 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb78839f70>


23/03/17 20:02:09 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:09 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:09 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb81eb9970>


23/03/17 20:02:14 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:14 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:14 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb765d07f0>


23/03/17 20:02:19 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:19 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:19 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb7791e7f0>


23/03/17 20:02:26 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:26 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:26 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb80839a60>


23/03/17 20:02:31 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:31 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:31 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb77b697c0>


23/03/17 20:02:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/17 20:02:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<google.cloud.bigquery.job.load.LoadJob object at 0x7fdb74b70460>
