In [84]:
# import pyspark
# from pyspark.sql import SparkSession
# from pyspark.conf import SparkConf
# from pyspark.context import SparkContext
# from pyspark.sql import functions as F
# from pyspark.sql import types

import os
import pandas as pd
import numpy as np
# import gcsfs
import dask.dataframe as dd
from dotenv import load_dotenv
from datetime import datetime, date, timedelta

In [5]:
load_dotenv()
credentials_location = os.getenv('GCP_CREDENTIALS_LOCATION')
gcp_bucket_name = os.getenv('GCP_BUCKET')
gcp_temporary_bucket = os.getenv('GCP_TEMP_BUCKET')
PROJECT_ID = os.getenv('GCP_PROJECT_ID')

credentials = '/Users/Manu/google_credentials.json'

In [50]:
cot = pd.DataFrame()

In [51]:
date.today().year


2022

In [73]:
# cot_reports_2022-06-10.txt
cot = pd.DataFrame()
print(f'shape of cot: {cot.shape}')
current_year = date.today().year
for yr in range(2011, current_year + 1):
    path = f'gs://{gcp_bucket_name}/raw/cot_reports_{yr}-*.txt'
    df = pd.read_csv(path, storage_options={"token": credentials})
    cot = cot.append(df, ignore_index=True)

shape of cot: (0, 0)
shape of cot: (1879, 87)
shape of cot: (3779, 87)
shape of cot: (5668, 88)
shape of cot: (7528, 88)
shape of cot: (9376, 88)
shape of cot: (11290, 88)
shape of cot: (13400, 88)
shape of cot: (15938, 88)
shape of cot: (18456, 88)
shape of cot: (20801, 88)
shape of cot: (23404, 88)
shape of cot: (24551, 88)


In [82]:
cot.isnull().sum()

Market_and_Exchange_Names         0
As_of_Date_In_Form_YYMMDD         0
Report_Date_as_MM_DD_YYYY     20772
CFTC_Contract_Market_Code         0
CFTC_Market_Code                  0
                              ...  
CFTC_Market_Code_Quotes           0
CFTC_Commodity_Code_Quotes        0
CFTC_SubGroup_Code                0
FutOnly_or_Combined               0
Report_Date_as_YYYY-MM-DD      3779
Length: 88, dtype: int64

In [83]:
cot[['As_of_Date_In_Form_YYMMDD', 'Report_Date_as_MM_DD_YYYY', 'Report_Date_as_YYYY-MM-DD']]

Unnamed: 0,As_of_Date_In_Form_YYMMDD,Report_Date_as_MM_DD_YYYY,Report_Date_as_YYYY-MM-DD
0,111227,2011-12-27,
1,111220,2011-12-20,
2,111213,2011-12-13,
3,111206,2011-12-06,
4,111129,2011-11-29,
...,...,...,...
24546,220201,,2022-02-01
24547,220125,,2022-01-25
24548,220118,,2022-01-18
24549,220111,,2022-01-11


In [85]:
conditions = [(cot['Report_Date_as_YYYY-MM-DD'].isnull()), (cot['Report_Date_as_YYYY-MM-DD'].notnull())]
values = [cot['Report_Date_as_MM_DD_YYYY'], cot['Report_Date_as_YYYY-MM-DD']]

cot['report_date'] = np.select(conditions, values)

In [88]:
cot[['Report_Date_as_YYYY-MM-DD', 'Report_Date_as_MM_DD_YYYY', 'report_date']]

Unnamed: 0,Report_Date_as_YYYY-MM-DD,Report_Date_as_MM_DD_YYYY,report_date
0,,2011-12-27,2011-12-27
1,,2011-12-20,2011-12-20
2,,2011-12-13,2011-12-13
3,,2011-12-06,2011-12-06
4,,2011-11-29,2011-11-29
...,...,...,...
24546,2022-02-01,,2022-02-01
24547,2022-01-25,,2022-01-25
24548,2022-01-18,,2022-01-18
24549,2022-01-11,,2022-01-11


In [115]:
def connection_pd():
    cot = pd.DataFrame()
    current_year = date.today().year
    
    for yr in range(2011, current_year + 1):
        path = f'gs://{gcp_bucket_name}/raw/cot_reports_{yr}-*.txt'
        df = pd.read_csv(path, storage_options={"token": credentials})
        cot = cot.append(df, ignore_index=True)
        
    return cot

In [116]:
cot_df = connection_pd()

In [117]:
def organize_columns(cot_df):
    """
        This function organizes the report dates into one column since currently
        they are spread in 3 different columns
    """
    conditions = [(cot['Report_Date_as_YYYY-MM-DD'].isnull()),
                  (cot['Report_Date_as_YYYY-MM-DD'].notnull())
                 ]
    
    values = [cot['Report_Date_as_MM_DD_YYYY'],
              cot['Report_Date_as_YYYY-MM-DD']
             ]
    
    cot_df['report_date'] = np.select(conditions, values)
    
    # convert to datetime type
    cot_df['report_date'] = pd.to_datetime(cot_df['report_date'], yearfirst=True)
    
    cot_df = cot_df.drop(['Report_Date_as_MM_DD_YYYY', 'Report_Date_as_YYYY-MM-DD',  'As_of_Date_In_Form_YYMMDD'], axis=1)
    
    return cot_df

In [118]:
cot_org = organize_columns(cot_df)

In [123]:
def get_latest_data(cot):
    """
    This function fetches the most recent unprocessed data
    """
    report_release = date.today() - timedelta(days = 4)
    cot_latest = cot[cot['report_date'] >= report_release]
    
    return cot_latest

In [None]:
def clean_colums(cot_df):
    
    # make column headers lower case
    cot_df.columns = [x.lower() for x in cot_df.columns]
    
    # separate market and Exchange names
    cot_df['market_name'] = cot_df['Market_and_Exchange_Names'].str.split('-').str[0]
    cot_df['exchange_name'] = cot_df['Market_and_Exchange_Names'].str.split('-').str[1]
    
    cot_df = cot_df.drop('Market_and_Exchange_Names')
    
    return cot_df
    

In [137]:
cot_org['market_name'] = cot_org['Market_and_Exchange_Names'].str.split('-').str[0]
cot_org['exchange_name'] = cot_org['Market_and_Exchange_Names'].str.split('-').str[1]
cot_org[['Market_and_Exchange_Names', 'market', 'exchange_name']]

Unnamed: 0,Market_and_Exchange_Names,market,exchange_name
0,CANADIAN DOLLAR - CHICAGO MERCANTILE EXCHANGE,CANADIAN DOLLAR,CHICAGO MERCANTILE EXCHANGE
1,CANADIAN DOLLAR - CHICAGO MERCANTILE EXCHANGE,CANADIAN DOLLAR,CHICAGO MERCANTILE EXCHANGE
2,CANADIAN DOLLAR - CHICAGO MERCANTILE EXCHANGE,CANADIAN DOLLAR,CHICAGO MERCANTILE EXCHANGE
3,CANADIAN DOLLAR - CHICAGO MERCANTILE EXCHANGE,CANADIAN DOLLAR,CHICAGO MERCANTILE EXCHANGE
4,CANADIAN DOLLAR - CHICAGO MERCANTILE EXCHANGE,CANADIAN DOLLAR,CHICAGO MERCANTILE EXCHANGE
...,...,...,...
24546,BLOOMBERG COMMODITY INDEX - CHICAGO BOARD OF T...,BLOOMBERG COMMODITY INDEX,CHICAGO BOARD OF TRADE
24547,BLOOMBERG COMMODITY INDEX - CHICAGO BOARD OF T...,BLOOMBERG COMMODITY INDEX,CHICAGO BOARD OF TRADE
24548,BLOOMBERG COMMODITY INDEX - CHICAGO BOARD OF T...,BLOOMBERG COMMODITY INDEX,CHICAGO BOARD OF TRADE
24549,BLOOMBERG COMMODITY INDEX - CHICAGO BOARD OF T...,BLOOMBERG COMMODITY INDEX,CHICAGO BOARD OF TRADE


In [3]:
def connection_setup():
    """
    This function sets up the spark connection and reads the data from cloud storage
    """
    #jar files
    jar_1 = "/Users/Manu/lib/spark-bigquery-with-dependencies_2.12-0.24.2.jar"
    jar_2 = "/Users/Manu/lib/gcs-connector-hadoop3-2.2.5.jar"
    data_date = datetime.today().strftime('%Y-%m-%d')
    
    
    # configuration
    conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", f'{jar_1}, {jar_2}') \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)
    
    # context
    sc = SparkContext(conf=conf)
    hadoop_conf = sc._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
    hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
    hadoop_conf.set("fs.gs.auth.service.account.enable", 'true')
    
    # Session
    spark = SparkSession.builder \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.11:0.12.2", conf=sc.getConf()) \
    .getOrCreate()
    
    # Read the data
    cot = spark.read \
        .option('header', 'true') \
        .csv(f'gs://{gcp_bucket_name}/raw/*') #_{data_date}.txt')
    
    return cot

In [4]:
cot = connection_setup()

22/06/10 17:44:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [5]:
def get_latest_data(cot):
    """
    This function fetches the most recent unprocessed data
    """
#     report_date = date.today() - timedelta(days = 9)
#     cot = cot.withColumn('Report_Date_as_YYYY-MM-DD', cot['Report_Date_as_YYYY-MM-DD'].cast(types.DateType()))
#     cot = cot.filter(cot['Report_Date_as_YYYY-MM-DD'] >= report_date)
    
    return cot

In [6]:
cot_filter = get_latest_data(cot)

In [7]:
cot_filter.count()
#cot_filter.select(['Market_and_Exchange_Names', 'Report_Date_as_YYYY-MM-DD', 'Open_Interest_All']).show()

                                                                                

24551

In [8]:
def clean_columns(cot):        
    # extract the numeric part of the contracts
    # drop the As_of_Date_In_Form_YYMMDD column
    cot_cols = cot.withColumn('Contract_Units', F.substring(cot['Contract_Units'], 19,7))\
                .drop('As_of_Date_In_Form_YYMMDD')
    
    # Make all columns lower case
    for col in cot_cols.columns:
        cot_cols = cot_cols.withColumnRenamed(col, col.lower())
    
    return cot_cols
    

In [9]:
cot_df = clean_columns(cot)

In [10]:
def add_schema(cot_df):
    desired_schema = [
    types.StringType(),types.DateType(),types.StringType(),types.StringType(),types.StringType(),types.StringType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),
    types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),
    types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),types.IntegerType(),
    types.IntegerType(),types.StringType(),types.StringType(),types.FloatType(),types.FloatType(),types.FloatType(),
    types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.FloatType(),types.StringType(),
    types.StringType(),types.StringType(),types.StringType(),types.StringType(),types.StringType()
    ]
    
    for idx, col in enumerate(cot_df.columns):
        for idx_s, dtype in enumerate(desired_schema):
            if idx == idx_s:
                cot_df = cot_df.withColumn(col, cot_df[col].cast(dtype))
            else:
                pass
            
    return cot_df

In [11]:
cot_schema = add_schema(cot_df)

In [12]:
def write_to_parquet(cot_schema):
    cot_schema.write.parquet(f'gs://{gcp_bucket_name}/cleaned/pq', mode='overwrite')
    return cot_schema

In [13]:
write_to_parquet(cot_schema)

22/06/10 17:45:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/06/10 17:45:53 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Market_and_Exchange_Names, Report_Date_as_MM_DD_YYYY, CFTC_Contract_Market_Code, CFTC_Market_Code, CFTC_Region_Code, CFTC_Commodity_Code, Open_Interest_All, Dealer_Positions_Long_All, Dealer_Positions_Short_All, Dealer_Positions_Spread_All, Asset_Mgr_Positions_Long_All, Asset_Mgr_Positions_Short_All, Asset_Mgr_Positions_Spread_All, Lev_Money_Positions_Long_All, Lev_Money_Positions_Short_All, Lev_Money_Positions_Spread_All, Other_Rept_Positions_Long_All, Other_Rept_Positions_Short_All, Other_Rept_Positions_Spread_All, Tot_Rept_Positions_Long_All, Tot_Rept_Positions_Short_All, NonRept_Positions_Long_All, NonRept_Positions_Short_All, Change_in_Open_Interest_All, Change_in_Dealer_Long_All, Change_in_Dealer_Short_All, Chan

                                                                                

DataFrame[market_and_exchange_names: string, report_date_as_yyyy-mm-dd: date, cftc_contract_market_code: string, cftc_market_code: string, cftc_region_code: string, cftc_commodity_code: string, open_interest_all: int, dealer_positions_long_all: int, dealer_positions_short_all: int, dealer_positions_spread_all: int, asset_mgr_positions_long_all: int, asset_mgr_positions_short_all: int, asset_mgr_positions_spread_all: int, lev_money_positions_long_all: int, lev_money_positions_short_all: int, lev_money_positions_spread_all: int, other_rept_positions_long_all: int, other_rept_positions_short_all: int, other_rept_positions_spread_all: int, tot_rept_positions_long_all: int, tot_rept_positions_short_all: int, nonrept_positions_long_all: int, nonrept_positions_short_all: int, change_in_open_interest_all: int, change_in_dealer_long_all: int, change_in_dealer_short_all: int, change_in_dealer_spread_all: int, change_in_asset_mgr_long_all: int, change_in_asset_mgr_short_all: int, change_in_asset_

In [16]:
def write_to_bigquery(cot_schema):
    cot_schema.write \
                .format('bigquery') \
                .option('project', PROJECT_ID) \
                .option('parentProject', PROJECT_ID) \
                .option('table', 'committment_of_traders.cot') \
                .option("temporaryGcsBucket",f"{gcp_temporary_bucket}") \
                .mode('overwrite') \
                .save()
    

In [17]:
write_to_bigquery(cot_schema)

22/06/10 17:53:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Market_and_Exchange_Names, Report_Date_as_MM_DD_YYYY, CFTC_Contract_Market_Code, CFTC_Market_Code, CFTC_Region_Code, CFTC_Commodity_Code, Open_Interest_All, Dealer_Positions_Long_All, Dealer_Positions_Short_All, Dealer_Positions_Spread_All, Asset_Mgr_Positions_Long_All, Asset_Mgr_Positions_Short_All, Asset_Mgr_Positions_Spread_All, Lev_Money_Positions_Long_All, Lev_Money_Positions_Short_All, Lev_Money_Positions_Spread_All, Other_Rept_Positions_Long_All, Other_Rept_Positions_Short_All, Other_Rept_Positions_Spread_All, Tot_Rept_Positions_Long_All, Tot_Rept_Positions_Short_All, NonRept_Positions_Long_All, NonRept_Positions_Short_All, Change_in_Open_Interest_All, Change_in_Dealer_Long_All, Change_in_Dealer_Short_All, Change_in_Dealer_Spread_All, Change_in_Asset_Mgr_Long_All, Change_in_Asset_Mgr_Short_All, Change_in_Asset_Mgr_Spread_All, Change_in_Lev_Money_Long_All, Change_in_Lev_Money_Short_All, C

                                                                                

In [None]:
datetime.today().strftime('%Y-%m-%d')

In [None]:
!pip install pipe

In [None]:
import pipe

In [None]:
help(pipe)

In [None]:
def first(x):
    return x+4

def second(x):
    return x - 4



In [None]:
frst = pipe.Pipe(first)
second = pipe.Pipe(second)

In [None]:
1 | second 

In [None]:
lst = ['first', 'second', 'third']

In [None]:
idx = lst.index('first')

In [None]:
lst.insert(idx + 1, 'first_1')

In [None]:
lst