# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 30
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2
%additional_python_modules polygon-api-client, nltk, transformers, beautifulsoup4, termcolor

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from urllib.request import urlopen, Request
from bs4 import BeautifulSoup
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import pandas as pd
import matplotlib.pyplot as plt
import time

from polygon import RESTClient
from polygon.rest.models import *

from termcolor import colored as cl
import requests
import datetime
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 30 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2
Additional python modules to be included:
polygon-api-client
nltk
transformers
beautifulsoup4
termcolor
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::973411499138:role/LabRole
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: 7915be7c-4cb0-4d72-9691-1c496dda84e3
Job Type: glueetl
Applying the following default arguments:
-

#### Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
def get_data_from_catalog(db_name, table, glue_context):
    dyf = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=table)
    pandas_df = dyf.toDF().toPandas()
    
    return pandas_df
    




#### Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [3]:
import pandas as pd
from awsglue.dynamicframe import DynamicFrame

def write_aws_table(
    pandas_df, 
    s3_path, 
    glue_context, 
    spark_session, 
    db_name, 
    table_name
):
    s3output = glue_context.getSink(
      path=s3_path,
      connection_type="s3",
      updateBehavior="UPDATE_IN_DATABASE",
      partitionKeys=[],
      compression="snappy",
      enableUpdateCatalog=True,
      transformation_ctx="s3output",
    )
    s3output.setCatalogInfo(
      catalogDatabase=db_name, catalogTableName=table_name
    )
    s3output.setFormat("glueparquet")
    data_spark_df = spark.createDataFrame(pandas_df)
    data_dyf = DynamicFrame.fromDF(data_spark_df, glue_context, f"{table_name}_df")
    s3output.writeFrame(data_dyf)




In [4]:
get_data_from_catalog("project", "kaggle", glueContext)

         index_key  ... ticker
0              0.0  ...      A
1              1.0  ...      A
2              2.0  ...      A
3              3.0  ...      A
4              4.0  ...      A
...            ...  ...    ...
1400464  1413844.0  ...     ZX
1400465  1413845.0  ...     ZX
1400466  1413846.0  ...     ZX
1400467  1413847.0  ...     ZX
1400468  1413848.0  ...     ZX

[1400469 rows x 4 columns]


In [5]:
news_tables = ["raw_polygon_news", "raw_wfc_2021_news", "raw_tgt_2021_news", "raw_tgt_2020_news", "raw_wfc_2020_news", "kaggle"]
db = "project"
combined_news_data = pd.DataFrame()
tickers = ["TGT", "WFC"]

for news_table in news_tables:
    news_table_data = get_data_from_catalog(db, news_table, glueContext)
    if news_table == "kaggle":
        news_table_data["date"] = pd.to_datetime(news_table_data["date"], errors='coerce', format='%Y-%m-%d %H:%M:%S', utc= True)
        news_table_data["date"] = news_table_data["date"].dt.strftime('%Y/%m/%d')
        news_table_data = news_table_data.drop("index_key", axis= 1)
        news_table_data = news_table_data.loc[(news_table_data["date"] > "2019/01/01") & (news_table_data["ticker"].isin(tickers))]
        
    combined_news_data = pd.concat([combined_news_data, news_table_data])
    
combined_news_data_target_fields = combined_news_data[["title", "date", "ticker"]]
combined_news_data_target_fields

                                                     title        date ticker
0            Top Ranked Growth Stocks to Buy for June 16th  2021/06/16    TGT
1                        Wall Street Breakfast: Taper Talk  2021/08/19    TGT
2        Target shares continue declining to find poten...  2022/02/26    TGT
3            Top Ranked Growth Stocks to Buy for June 23rd  2021/06/23    TGT
4        Private Brands Alone Can't Save Bed Bath & Beyond  2021/06/06    TGT
...                                                    ...         ...    ...
1337347  UPDATE: Citi Believes 'risk/reward for the ban...  2019/01/10    WFC
1337348  This Day In Market History: First American Ban...  2019/01/07    WFC
1337349  Wells Fargo's Valuation Discounts Regulatory C...  2019/01/02    WFC
1337350  Benzinga's Top Upgrades, Downgrades For Januar...  2019/01/02    WFC
1337351  RBC Capital Upgrades Wells Fargo to Sector Per...  2019/01/02    WFC

[7471 rows x 3 columns]


# Guardado en Transformed

In [7]:
write_aws_table(
    combined_news_data_target_fields, 
    "s3://project-2023-datalake/transformed/tickers_news/wfc_tgt_news_2019_to_2023/", 
    glueContext, 
    spark, 
    "project", 
    "transformed_wfc_tgt_news"
)




# Checkeo de fechas faltantes

In [8]:
def get_missing_dates(values, init_date, end_date):
  values = set(values)
  missing_dates = []
  init_date_dt = datetime.datetime.strptime(init_date, '%Y/%m/%d')
  end_date_dt = datetime.datetime.strptime(end_date, '%Y/%m/%d')
  date_diff = (end_date_dt - init_date_dt).days

  for day_index in range(1, date_diff + 1):
    curr_date = init_date_dt + datetime.timedelta(days = day_index)
    curr_date_str = curr_date.strftime('%Y/%m/%d')
    #print(f"Current date in datetime: {curr_date}")

    if init_date_dt <= curr_date <= end_date_dt and curr_date_str in values:
      #print(f"{curr_date_str} date not missing")
      continue
    else:
      missing_dates.append(curr_date_str)
      #print(f"Missing date: {curr_date_str}")
  
  return missing_dates




In [9]:
# Missing date in overall dataset
INIT_DATE = '2019/01/01'
END_DATE = '2023/05/28'

news_date_vals = pd.unique(combined_news_data_target_fields["date"])
news_missing_dates = get_missing_dates(news_date_vals, INIT_DATE, END_DATE)
print(f" Missing dates between {INIT_DATE} - {END_DATE}:")
print(news_missing_dates)
print(f"Total count: {len(news_missing_dates)}")
print(f"Fraction of a Year missing days: {len(news_missing_dates)/365}")

 Missing dates between 2019/01/01 - 2023/05/28:
['2019/01/03', '2019/01/04', '2019/01/05', '2019/01/06', '2019/01/12', '2019/01/17', '2019/01/18', '2019/01/19', '2019/01/20', '2019/01/21', '2019/01/23', '2019/01/25', '2019/01/26', '2019/01/27', '2019/01/30', '2019/01/31', '2019/02/03', '2019/02/05', '2019/02/06', '2019/02/09', '2019/02/10', '2019/02/11', '2019/02/14', '2019/02/16', '2019/02/18', '2019/02/21', '2019/02/22', '2019/02/25', '2019/02/26', '2019/03/02', '2019/03/03', '2019/03/07', '2019/03/08', '2019/03/09', '2019/03/10', '2019/03/11', '2019/03/14', '2019/03/15', '2019/03/16', '2019/03/17', '2019/03/19', '2019/03/23', '2019/03/24', '2019/03/27', '2019/03/31', '2019/04/03', '2019/04/05', '2019/04/06', '2019/04/10', '2019/04/13', '2019/04/14', '2019/04/19', '2019/04/20', '2019/04/21', '2019/04/22', '2019/04/23', '2019/04/25', '2019/04/27', '2019/04/30', '2019/05/01', '2019/05/05', '2019/05/07', '2019/05/09', '2019/05/11', '2019/05/12', '2019/05/18', '2019/05/19', '2019/05/26',

## Revision de dias faltantes en el año

In [10]:
news_missing_dates_fomatted = [date.split("/") for date in news_missing_dates]
news_missing_dates_fomatted = pd.DataFrame(news_missing_dates_fomatted, columns=["year", "month", "day"])
news_missing_dates_fomatted.groupby(["year"]).count()

      month  day
year            
2019    160  160
2020    165  165
2021     55   55
2022     27   27
2023      8    8


## Revision de dias faltantes por mes en el año

In [11]:
news_missing_dates_fomatted.groupby(["year", "month"]).count()

            day
year month     
2019 01      16
     02      13
     03      16
     04      14
     05      11
     06      17
     07      14
     08      11
     09      15
     10      10
     11      11
     12      12
2020 01      11
     02      14
     03       5
     04       8
     05       9
     06      10
     07      10
     08      20
     09      24
     10      25
     11      17
     12      12
2021 01      27
     02      14
     03       6
     04       3
     05       1
     08       2
     09       1
     12       1
2022 01       4
     02       2
     03       3
     04       1
     05       3
     06       1
     07       3
     08       2
     09       4
     10       2
     12       2
2023 01       1
     02       2
     03       1
     04       3
     05       1
