In [None]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType,
                               IntegerType, BooleanType, TimestampType,
                              ArrayType, MapType, DateType)

from google.cloud import bigquery
from pyspark.sql.window import Window

spark = SparkSession \
        .builder.appName("DimNews") \
        .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.0") \
        .getOrCreate()

#load news DF
news_Df = spark \
        .read.option("recursiveFileLookup", "true") \
        .option("header", "true") \
        .parquet("gs://stocks-pipeline/raw-data/sock_news")


news_Df = news_Df.withColumn("newsKey", f.row_number().over(Window.orderBy('title')))
news_Df=news_Df.dropDuplicates(["id"])

news_Df = news_Df.drop("ticket")
news_Df = news_Df.withColumn('published_utc', f.split(news_Df['published_utc'], 'T').getItem(0))
news_Df = news_Df.withColumnRenamed("published_utc","PublishedUtc") \
            .withColumnRenamed("amp_url", "AmpUrl") \
            .withColumnRenamed("article_url", "ArticleUrl") \
            .withColumnRenamed("image_url", "ImageUrl") \
            .withColumnRenamed("id", "ArticleId")
            
news_Df = news_Df.withColumn("PublishedUtc",f.col("PublishedUtc").cast(DateType()))
news_Df = news_Df.fillna(value = 'Unknown', subset = ["AmpUrl"])

persist_df = news_Df.persist()

# persist_df = persist_df.drop("tickers")
persist_df = persist_df.drop("keywords")

print(persist_df.show())
    
    
sd=persist_df.filter("newsKey = 19301").select("tickers").show()
print(sd)



In [32]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


chunk_size = 50000
i=0
window = Window.orderBy('newsKey')
persist_Df = persist_df.withColumn("row_idx", row_number().over(window))


# Loop through the dataframe in chunks
while i < persist_Df.count():
    
    # Select the chunk of data
    chunk=persist_Df.orderBy('newsKey').filter(persist_Df["row_idx"] >= i).take(chunk_size)

    # create a BigQuery client and dataset reference
    client = bigquery.Client(project='noted-span-377814')
    dataset_ref = client.dataset('Stocks_DW')

    # create a BigQuery table and upload the data
    table_ref = dataset_ref.table('DimNews')
    chunk_df = spark.createDataFrame(chunk).drop("row_idx")
    job_config = bigquery.LoadJobConfig(write_disposition='WRITE_APPEND')
    job = client.load_table_from_dataframe(chunk_df.toPandas(), table_ref, job_config=job_config)
    i+=chunk_size
    
    print(job.result())

In [None]:

# fill bridge table

bridge_Df_news = news_Df.select("newsKey","tickers").withColumn("tickers_exploded", f.explode("tickers")).drop("tickers")

# load tickers DF
ticker_Df = spark.read.format("parquet") \
            .option("header", "true") \
            .load("gs://stocks-pipeline/raw-data/ticker_details/*.parquet")

ticker_Df = ticker_Df.withColumn("tickerKey", f.row_number().over(Window.orderBy('ticker')))
bridge_Df_ticker = ticker_Df.select("tickerKey", "ticker")

# join bridge_Df_ticker and bridge_Df_news
joined_Df = bridge_Df_ticker \
            .join(bridge_Df_news, bridge_Df_ticker.ticker == bridge_Df_news.tickers_exploded, "inner") \
            .select("tickerKey", "newsKey")

joined_Df = joined_Df.distinct()
persist_Df = joined_Df.persist()

print(persist_Df.show())

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


chunk_size = 70000
i=0
window = Window.orderBy('newsKey')
persist_Df = persist_Df.withColumn("row_idx", row_number().over(window))


# Loop through the dataframe in chunks
while i < persist_Df.count():
    # Select the chunk of data
    chunk=persist_Df.orderBy('newsKey').filter(persist_Df["row_idx"] >= i).take(chunk_size)

    # create a BigQuery client and dataset reference
    client = bigquery.Client(project='noted-span-377814')
    dataset_ref = client.dataset('Stocks_DW')

    # create a BigQuery table and upload the data
    table_ref = dataset_ref.table('BrgNewsTicker')
    chunk_df = spark.createDataFrame(chunk).select('tickerKey','newsKey')
    job_config = bigquery.LoadJobConfig(write_disposition='WRITE_APPEND')
    job = client.load_table_from_dataframe(chunk_df.toPandas(), table_ref, job_config=job_config)
    i+=chunk_size
    print(job.result())
    
    
spark.stop()