In [1]:
import pandas as pd
import requests
import json
import time
import csv
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *



# Get a list of all coins with id, name and symbol (using Crypto API)
# API Request
response = requests.get('https://api.coingecko.com/api/v3/coins/list')

# Conditional in case bad response
if response.status_code == 200:
    data = response.json()
    df = pd.DataFrame.from_dict(pd.json_normalize(data), orient='columns')
    print(f'The list of all coins with id, name and symbol')
    print(df)
    print('Get bitcoin coin id')
    bit=df.loc[df['name'] == "Bitcoin"]
    print(bit)
else:
    print(f'Bad request. Response code {response.status_code}')



# Get the price of bitcoin in usd and by date of the first quarter of 2022 (using Crypto API)
print('Get the price of bitcoin in usd and by date of the first quarter of 2022')
# First I converted from date and to date to timestamp to get the parameters for request
date_start = '01-01-2022'
f = time.mktime(datetime.strptime(date_start, '%d-%m-%Y').timetuple())
date_finish = '31-03-2022'
t = time.mktime(datetime.strptime(date_finish, '%d-%m-%Y').timetuple())

# Setting parameters
parameters= {
    'id' : 'bitcoin',
    'vs_currency' : 'usd',
    'from' : f,
    'to' : t
}

# Get request

response2 = requests.get('https://api.coingecko.com/api/v3/coins/bitcoin/market_chart/range',params=parameters)

# Conditional in case request failed
if response2.status_code == 200:
    data = response2.json()
    # Saving the file to store it in data Lake
    myFile = open('Bitcoin_prices.csv','w')
    writer = csv.writer(myFile)
    writer.writerow(['Timestamp','Prices'])
    writer.writerows(response2.json()['prices'])
    myFile.close()
    print(f'File saved as csv: Bitcoin_prices.csv')
else:
    print(f'Bad request. Response code {response.status_code}')


print('Consume the data previously persisted in the database to make a window/partition function for every 5 days')
# start Spark Session
spark = SparkSession.builder.appName('bitcoins').getOrCreate()
# Start to transform the BBDD from Timestamp to datetime
df = spark.read.csv('Bitcoin_prices.csv',header=True,inferSchema=True)
df = df.withColumn('Timestamp',(col('Timestamp')/1000).cast('timestamp'))
df.show()
# Saving the df into csv
df.write.option("header", True).options(delimiter=",").csv('Konfio/Bitcoin')

# The windows function created is moving averages , it wasn't specify the aggregation function.
days = lambda i: i*8600  #Seconds in a day

# Create the windows function

windowsSpec = Window.orderBy(col('Timestamp').cast('long')).rangeBetween(-days(4),0)
df2 = df.withColumn('Avg-5days',avg('Prices').over(windowsSpec))
df2.show()

# Save the file of windows function
print(f'File saved as csv at Konfio/Bitcoin2')
df2.write.option("header", True).options(delimiter=",").csv('Konfio/Bitcoin2')

The list of all coins with id, name and symbol
                     id   symbol              name
0                01coin      zoc            01coin
1                0chain      zcn               Zus
2                    0x      zrx                0x
3                0xcert      zxc            0xcert
4                 0xdao      oxd             0xDAO
...                 ...      ...               ...
12977  z-versus-project  zversus  Z Versus Project
12978          zynecoin      zyn          Zynecoin
12979              zyro     zyro              Zyro
12980             zyrri      zyr             Zyrri
12981               zyx      zyx               ZYX

[12982 rows x 3 columns]
Get bitcoin coin id
           id symbol     name
1409  bitcoin    btc  Bitcoin
Get the price of bitcoin in usd and by date of the first quarter of 2022
File saved as csv: Bitcoin_prices.csv
Consume the data previously persisted in the database to make a window/partition function for every 5 days


FileNotFoundError: [WinError 2] El sistema no puede encontrar el archivo especificado