In [None]:
#######################################
###!@0 START INIT ENVIRONMENT
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz -P /content/drive/MyDrive # link wrong in blog
!tar xf /content/drive/Shareddrives/DA231-2021-Aug-Public/spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

###!@0 END INIT ENVIRONMENT

Mounted at /content/drive


In [None]:
#######################################
###!@1 START OF PYSPARK INIT
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
spark
# Spark is ready to go within Colab!
###!@1 END OF PYSPARK INIT

In [None]:
#Common Imports
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf
import sys
import json
import pandas as pd

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [None]:
# Folder ID of a folder in my Google Drive. This has to be changed
fid = "1aWbFBZuxGqNouFyxGWQkRIG5vd58JPgQ"

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive2 = GoogleDrive(gauth)

In [None]:
sc = SparkContext.getOrCreate()

drivepath = "/content/drive/Shareddrives/ProjectSharedDrive/wazirxCSVData"
driveDAILYpath = "/content/drive/Shareddrives/ProjectSharedDrive/wazirxDAILY"


tickers = open(drivepath+"/tickers" ,"r")
tfile = json.load(tickers)
tickers.close()

Get the list of coins from **tickers** file.
Choose only **USDT** Coins

In [None]:
coins = list(tfile.keys())
USDTcoins = [s for s in coins if "usdt" in s]

Sanity check in case the data upload is interrupted in between.
Get the list of files and compare with the list created. 
Upload only the coins which are not uploaded yet

In [None]:

uploaded = drive2.ListFile({'q': "title contains 'csv' "}).GetList()

alreadyUploaded = []
for fil in uploaded:
  alreadyUploaded.append(fil['originalFilename'])

coinsUploaded = [s[:-10] for s in alreadyUploaded if "HOURLY" in s]
coinsNotUploaded = [s for s in USDTcoins if s not in coinsUploaded]

print(len(coinsNotUploaded))

0


Creating Cumulative Data

**Hourly to Daily**

**Date**: Day starts at 02-11-2021  00:00:00 and ends at 23:00:00. Date must be in 02-11-2021 format

**Open**: Use the same Hourly.'Open' value at 00:00:00

**High**: Use the Max of Hourly.'High' Column

**Low**: Use of Min of Hourly.'Low' Column

**Close**: Use the same Hourly.'Close' value at 23:00:00

**Volume**: Get the Sum of Hourly.'Volume’

Similar logic for **Daily to Monthly** also


In [None]:
for coin in coinsNotUploaded:
  coinname = coin
  print("Processing "+ coin)
  coinfile = drivepath+"/"+coinname+".csv"
  coinDF1 = spark.read.option("header",True).option("inferSchema",True).csv(coinfile)
  hourDF = coinDF1.dropDuplicates()

  #DAILY
  dayDF = hourDF.dropDuplicates().withColumn("Daily", col("Date")[0:10])

  DailyCumulative = dayDF.groupBy(["Daily"]).agg(F.first('Open').alias('Open'), F.max('High').alias('High'), F.min('Low').alias('Low'),\
                                                F.last('Close').alias('Close'), F.sum('Volume').alias('Volume'), F.count('Daily').alias('Count'))\
                                                .withColumnRenamed("Daily", "Date").orderBy(["Date"])
  DailyDF = DailyCumulative.filter(col("Count") > 18).drop("Count")

  #MONTHLY
  monthDF = DailyCumulative.withColumn("Monthly", col("Date")[0:7])

  MonthlyCumulative = monthDF.groupBy(["Monthly"]).agg(F.first('Open').alias('Open'), F.max('High').alias('High'), F.min('Low').alias('Low'),\
                                                F.last('Close').alias('Close'), F.sum('Volume').alias('Volume'), F.count('Monthly').alias('Count'))\
                                                .withColumnRenamed("Monthly", "Date").orderBy(["Date"])

  MonthlyDF = MonthlyCumulative.filter(col("Count") > 20).drop("Count")

  #CSV Files in Local Storage
  hourDF.toPandas().to_csv(coinname+"HOURLY.csv")
  DailyDF.toPandas().to_csv(coinname+"DAILY.csv")
  MonthlyDF.toPandas().to_csv(coinname+"MONTHLY.csv")


  #Upload to Google Drive
  f = drive2.CreateFile({'title': coinname+"HOURLY.csv", 'parents':[{'id': fid}]})
  f.SetContentFile(coinname+"HOURLY.csv")
  f.Upload()

  f = drive2.CreateFile({'title': coinname+"DAILY.csv", 'parents':[{'id': fid}]})
  f.SetContentFile(coinname+"DAILY.csv")
  f.Upload()

  f = drive2.CreateFile({'title': coinname+"MONTHLY.csv", 'parents':[{'id': fid}]})
  f.SetContentFile(coinname+"MONTHLY.csv")
  f.Upload()
  print("Uploaded "+coin)
