# Data retrieval from localbitcoins API

In [None]:
import requests
import pandas as pd
import numpy as np
import datetime as dt
from datetime import datetime, timezone
import pytz
from data_loader_s3 import upload_file

In [None]:
def make_request(url_base,url):
    r = requests.get(url_base+url)
    file_json = r.json()
    return file_json

def json_to_pandas(file_json:dict):
    return pd.DataFrame.from_dict(file_json)


### Calculation exchange rate of reference to further math on pct_change:
def exchange_reference(df, date_reference):
    
    memory = {}
        
    for currency_code in df["currency_code"].unique():
        
        if currency_code != "VED":
            date_reference = date_reference
        else:
            date_reference = "2021-10-10"

    
        ## Select currency for calculations:
        df_exchange = df[df['currency_code'] == currency_code ]

        ## Select date to calculate reference

        df_date_reference = df_exchange.loc[(df_exchange.date).dt.date == pd.Timestamp(date_reference)]

        ### Averare exchange rate for day of reference:

        implicit_exchange_reference = df_date_reference.implicit_exchange.mean()
        
        memory[currency_code] = implicit_exchange_reference

    ### Percentual Variation

#     equation = ((df_exchange.implicit_exchange / implicit_exchange_reference)-1)*100
    
    
    return memory

## Currencies

In [None]:
url_base = 'https://localbitcoins.com/'
# Currencies:
url = "api/currencies"
currency_code = make_request(url_base,url)
df_currency_code = json_to_pandas(currency_code['data'])
# df_currency_code.info()
# df_currency_code.head()

In [None]:
## Extract country information related to currency
currency_name = []
currency_is_altcoin = []
for index, row in df_currency_code.iterrows():
    currency_name.append(row[0]["name"])
    currency_is_altcoin.append(row[0]["altcoin"])
    
## Reorganize on df

df_currency_code['currencies'] = currency_name
df_currency_code['altcoin'] = currency_is_altcoin
df_currency_code = df_currency_code.reset_index()

df_currency_code.head()

### Average price of btc by country (in local currency)

In [None]:
url = "bitcoinaverage/ticker-all-currencies/"
price_btc = make_request(url_base,url)
time_stamp = datetime.now().timestamp()

In [None]:
df_price_btc = json_to_pandas(price_btc)

#transpose data frame. Index is now by Country:

df_price_btc = df_price_btc.T
df_price_btc['time_stamp'] = time_stamp
df_price_btc = df_price_btc.reset_index()
print(df_price_btc.info())
df_price_btc.head()

## Merge and clean dataframe

In [None]:
df_market = df_price_btc.merge(df_currency_code, how='right')

# Drop the rows with NAN values in time_stamp
df_market.dropna(subset=['time_stamp'], inplace=True)

# Add date from time_stamp
df_market['date'] = datetime.fromtimestamp(time_stamp, tz=timezone.utc)
df_market = df_market[['time_stamp','date','index','currencies','volume_btc','avg_24h','avg_12h','avg_6h','avg_1h','altcoin']]
df_market.rename(columns = {'index':'currency_code'}, inplace = True)
print(df_market.info())
df_market.head(10)


### I should include some data formatting so info is stored in the right format

In [None]:
## Adding avg_24h exchange rate USD/BTC for inmediate comparison:

df_market["avg_24h_usd"] = df_market[df_market['currency_code']=="USD"].avg_24h.tolist()[0]

In [None]:
### Formatting data types:

# Float values:
for column_label in ['volume_btc','avg_24h','avg_12h','avg_6h','avg_1h','avg_24h_usd']:

    df_market[column_label] = pd.to_numeric(df_market[column_label])

df_market.info()

In [None]:
## Calculating implicit exchange rate:
df_market['implicit_exchange'] = df_market['avg_24h'].divide(df_market["avg_24h_usd"], fill_value = None)

In [None]:
# Drop the rows with NAN values in time_stamp
df_market.dropna(subset=['time_stamp'], inplace=True)

In [None]:
df_market.head(2)

## Save information in .csv and sql database

In [None]:
# Transform timestamp into ISO standard - UTC time zone:
time_stamp = datetime.fromtimestamp(time_stamp)
# Convert to UTC
time_stamp_utc = time_stamp.astimezone(pytz.utc)
print("Timestamp UTC", time_stamp_utc)

In [None]:
# Saving csv by timestamp:
df_market.to_csv(f'./data/csv/{time_stamp_utc}.csv',index=False)
# Saving parquet by timestamp:
df_market.to_parquet(f'./data/parquet/{time_stamp_utc}.parquet',index=False)

# Saving historical data by appending:
df_market.to_csv(f'./data/historical_data.csv',index = False, header = None, mode = 'a')
# Read historical data and transform into parquet :
df_historical = pd.read_csv('./data/historical_data.csv')

# Overwrite to update file with appended data
df_historical.to_parquet(f'./data/historical_data.parquet',index=False)

In [None]:
# Append information on sqlite database

from sqlalchemy import Column, Integer, Float, String, DateTime,Boolean, ForeignKey
from sqlalchemy.orm import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('sqlite:///data/CurrenciesDataBase.db')
print(type(engine))
df_market.to_sql(name="currencies_vs_btc", con=engine, if_exists="append", index=False)

#### Calculation of pct_change taking 2021-10-03 as date of reference

In [None]:
# path_to_currencies = "sqlite:///data/CurrenciesDataBase.db"
# df_market = pd.read_sql('currencies_vs_btc', path_to_currencies)

# ### The pct calculations are not properly saved on the dataset unless I upload one previously saved. Why?
# # The pipeline defined solve the issue by saving the calculated data on a new dataset but the calculations
# # are done in the full dataset everytime the code runs.
# # This practice is not scalable

# # The pct could be calculated on the web interface for a given date selected by the users but this implies 
# # more processing time

In [None]:
# df = df_market.groupby(by=["date","currency_code"]).mean()
# df.reset_index(inplace=True)

# ### Dataset with implicit exchange of reference by currency:
# date_reference = "2021-10-03"
# exchange = exchange_reference(df, date_reference)
# exchange = pd.DataFrame.from_dict(exchange, orient="index")

# df['pct'] = df.apply(lambda x : ((x['implicit_exchange']/exchange.loc[x["currency_code"]])-1)*100, axis = 1 )

# ### Drop Null values:

# df.dropna(subset=['pct'], inplace=True)

In [None]:
# df.info()

In [None]:
# ### Saving on DB Version 2

# engine = create_engine('sqlite:///data/CurrenciesDataBase_V2.db')
# print(type(engine))
# df.to_sql(name="currencies_vs_btc", con=engine, if_exists="append", index=False)

In [None]:
now = dt.datetime.now()
print("Last Execution: ")
print(str(now))
print('Download and transform API data completed')

# Load raw table to S3

- Landing folder stores parquet files by day

In [None]:
file_name = f'./data/parquet/{time_stamp_utc}.parquet'
object_name = f"landing/{time_stamp_utc}.parquet"
upload_file(file_name, "currencytracker", object_name)

- Stage folder stores one file with all historical data

In [None]:
file_name = f'./data/historical_data.parquet'
object_name = f"stage/{time_stamp_utc}.parquet"
upload_file(file_name, "currencytracker", object_name)