### Dataset - Wikipedia

**tipo de ingestão**: full load

**origem**: Yahoo finance, Wikipedia

**destino**: bronze/raw/bitcoin/btc_sent.csv

**formato do data no data lake**: .csv

**objetivo**: Predicoes do modelo


#### Instruções

1. Renomear o arquivo .env_exemplo para somente .env
2. Adicionar popular as variaveis conforme o padrão de nomenclatura que voce utilizar

In [None]:
from azure.storage.blob import BlobClient
import pandas as pd
from transformers import pipeline
import yfinance as yf
import mwclient
import time
import requests
from datetime import datetime
from statistics import mean
from io import StringIO
from dotenv import load_dotenv
import os

##carrega variaveis de ambiente
load_dotenv()

In [None]:
data_inicio = datetime.strptime('2018-01-01', '%Y-%m-%d')
sentiment_pipeline = pipeline(model="distilbert-base-uncased-finetuned-sst-2-english")

Extrai os dados de bitcoin do Yahoo Finance

In [3]:
def extract_btc(data_inicio: datetime) -> pd.DataFrame:
    ticker = yf.Ticker("BTC-USD")
    btc = ticker.history(start=data_inicio)
    return btc

In [4]:
def format_base(df:pd.DataFrame) -> pd.DataFrame:
    df.index = pd.to_datetime(df.index).tz_localize(None)
    del df["Dividends"]
    del df["Stock Splits"]
    df.columns = [c.lower() for c in df.columns]
    return df

Extrai os dados das reviews de bitcoin da Wikipedia

In [5]:
def extract_reviews() -> list:
    site = mwclient.Site("en.wikipedia.org")
    page = site.pages["Bitcoin"]
    revs = list(page.revisions(start=data_inicio, dir="newer"))
    revs = sorted(revs, key=lambda rev: rev["timestamp"])
    return revs

In [6]:
def find_sentiment(text):
    sent = sentiment_pipeline([text[:250]])[0]
    score = sent["score"]
    if sent["label"] == "NEGATIVE":
        score *= -1
    return score

In [7]:
def format_edits() -> dict:
    edits = {}
    revs = extract_reviews()
    for rev in revs:
        date = time.strftime("%Y-%m-%d", rev["timestamp"])
        if date not in edits:
            edits[date] = dict(sentiments=list(), edit_count=0)

        edits[date]["edit_count"] += 1
        comment = rev.get("comment", "")
        edits[date]["sentiments"].append(find_sentiment(comment))
    return edits

In [8]:
def clean_sentiment_base(sentiment_edits: dict) -> dict:
    edits = sentiment_edits
    for key in edits:
        if len(edits[key]["sentiments"]) > 0:
            edits[key]["sentiment"] = mean(edits[key]["sentiments"])
            edits[key]["neg_sentiment"] = len(
                [s for s in edits[key]["sentiments"] if s < 0]
            ) / len(edits[key]["sentiments"])
        else:
            edits[key]["sentiment"] = 0
            edits[key]["neg_sentiment"] = 0

        del edits[key]["sentiments"]
    return edits

In [9]:
def create_edits_df() -> pd.DataFrame:
    edits = clean_sentiment_base(format_edits())
    edits_df = pd.DataFrame.from_dict(edits, orient="index")
    edits_df.index = pd.to_datetime(edits_df.index)
    return edits_df

In [10]:
def improve_edits_df(edits_df: pd.DataFrame) -> pd.DataFrame:
    dates = pd.date_range(start=data_inicio, end=datetime.today())
    edits_df = edits_df.reindex(dates, fill_value=0)
    edits_df["edit_count"] = edits_df["edit_count"].shift(1)
    edits_df["sentiment"] = edits_df["sentiment"].shift(1)
    edits_df["neg_sentiment"] = edits_df["neg_sentiment"].shift(1)
    # edits_df = edits_df.dropna()
    rolling_edits = edits_df.rolling(30, min_periods=30).mean()
    # rolling_edits = rolling_edits.dropna()
    return rolling_edits

In [11]:
def get_sentiment_df() -> pd.DataFrame:
    edits_df = create_edits_df()
    improved_df = improve_edits_df(edits_df)
    return improved_df

Une as bases de bitcoin e sentimentos

In [12]:
def merge_dfs() -> pd.DataFrame:
    btc = format_base(extract_btc(data_inicio))
    df_sentiment = get_sentiment_df()
    data = btc.merge(df_sentiment, left_index=True, right_index=True)
    return data

Cria colunas adicionais ao dataframe existente para verificar possíveis tendências em horizontes de tempo

In [13]:
def trends_col(df: pd.DataFrame) -> pd.DataFrame:
    horizons = [2, 7, 365]

    for horizon in horizons:
        rolling_averages = df.rolling(horizon, min_periods=1).mean()

        ratio_column = f"close_ratio_{horizon}"
        df[ratio_column] = df["close"] / rolling_averages["close"]

        edit_column = f"edit_{horizon}"
        df[edit_column] = rolling_averages["edit_count"]

    return df

In [14]:
def get_data() -> pd.DataFrame:
    df = merge_dfs()
    df = trends_col(df)
    return df

In [None]:
df = get_data()
df.tail(1)

Ingestao no Azure Data Lake Storage

In [20]:
def connect_to_adls(container_name: str, blob_name: str):
    connection_string = os.environ["CONNECTION_STRING_DL"]
    blob = BlobClient.from_connection_string(
        conn_str=connection_string,
        container_name=container_name,
        blob_name=blob_name
    )
    return blob

In [None]:
def upload_data(container_name: str, df: pd.DataFrame):
    try:
        csv_name = "btc_sent"
        blob_name = f"raw/bitcoin/{csv_name}.csv"
        blob = connect_to_adls(container_name, blob_name)
        # Converte o dataframe em uma string CSV
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=True)
        blob.upload_blob(csv_buffer.getvalue(), overwrite=True)
    except Exception as e:
        print(f"Upload csv to raw failed.{e}")
    else:
        print('Upload csv to raw sucessful')
        return True

upload_data('bronze', df)