In [1]:
!pip install TwitterApi

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting TwitterApi
  Downloading TwitterAPI-2.8.2.tar.gz (12 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: TwitterApi
  Building wheel for TwitterApi (setup.py) ... [?25l[?25hdone
  Created wheel for TwitterApi: filename=TwitterAPI-2.8.2-py3-none-any.whl size=14881 sha256=c555235d21b9cf90f9807838fa0b4c55301fc1401a12361a3bdb2527393594e3
  Stored in directory: /root/.cache/pip/wheels/aa/d3/ce/649017b934d001f7113c8f461dedf29da03e4cbe2894d5f145
Successfully built TwitterApi
Installing collected packages: TwitterApi
Successfully installed TwitterApi-2.8.2


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=5e793bc63159711d63f5dddde51e6c3b204d3c7c2fe25cb9519fe30a1b612544
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
%load_ext sql

In [4]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql.functions import year, month, dayofweek, hour, weekofyear, date_format, \
                                  udf, col, lit, struct, isnan, count, when


In [5]:
import pandas as pd
import requests
import datetime
import time
import json
import csv

In [6]:
from urllib.parse import urljoin
from functools import reduce

In [7]:
from TwitterAPI import TwitterAPI

In [8]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .appName("ETL_CryptocurrencyInsight") \
    .getOrCreate()

In [None]:
def extract_tradingdata(symbol):
    """
    Description: Collect URL and all parameters. The interval is set to 1d fetching the daily close.
                 No specific headers needed as it is a public API.
                 Note: To avoid IP ban the number of API calls muste be < 100 calls/minute.
    Parameters:
        symbol: Describes the traidingpair (cryptocurrency asset) for which trading data being requested.
    Returns:
        response: Contains the response from the API call to the cryptocurrency exchange.
    """
    
    baseurl = "https://api.binance.com"
    path = "/api/v3/klines"
    headers = None
    params = {
        'symbol': symbol,
        'interval': '1d'
    }
    url = urljoin(baseurl, path)
    
    # Requesting data from cryptocurrency exchange for specific symbol
    response = requests.get(url, headers=headers, params=params)
    
    print("Success: Call to crypto currency exchange executed")
    return response
    
def get_tradingdata(symbol):
    """
    Description: The function is calling the API of an cryptocurrency exchange getting the trading details for
                 for a specific cryptocurrency asset. Afterwards it transforms the trading data.
    Parameters:
        symbol: Describes the traidingpair (cryptocurrency asset) for which trading data being requested.
    Returns:
        df_marketdata: Spark dataframe containing the trading data which are matched with the corresponding symbol.
    """
    
    # Execute the trading data extraction from datasource
    response = extract_tradingdata(symbol)

    # If the response is successful and data are being fetched prepare dataframe containing marketdata
    if response.status_code == 200:

        # Extract trading details and make them available to the application
        # Define columns for dataframe based on API description from cryptocurrency exchange. 
        # Source: https://github.com/binance-exchange/binance-official-api-docs/blob/master/rest-api.md#general-api-information
        content_tradingdata = response.json()   
        columns_tradingdata = ['open_time', 
                              'open', 
                              'high', 
                              'low', 
                              'close', 
                              'volume', 
                              'close_time', 
                              'quote_asset_volume', 
                              'number_of_trades', 
                              'taker_buy_base_asset_volume', 
                              'taker_buy_quote_asset_volume', 
                              'unclassfied']
        df_tradingdata = spark.createDataFrame(content_tradingdata, columns_tradingdata)

        # Overwritting trading dataframe by adjusting datatypes and adding symbol column
        # ensuring the trading details are assigned to the correct cryptocurrency
        df_tradingdata = df_tradingdata.selectExpr("cast(open_time as long) open_time", 
                                                  "cast(open as decimal(20,10)) open", 
                                                  "cast(high as decimal(20,10)) high", 
                                                  "cast(low as decimal(20,10)) low", 
                                                  "cast(close as decimal(20,10)) close",
                                                  "cast(volume as decimal(20,10)) volume",
                                                  "cast(close_time as long) close_time",
                                                  "cast(quote_asset_volume as decimal(20,10)) quote_asset_volume",
                                                  "cast(number_of_trades as integer) number_of_trades",
                                                  "cast(taker_buy_base_asset_volume as decimal(20,10)) taker_buy_base_asset_volume",
                                                  "cast(taker_buy_quote_asset_volume as decimal(20,10)) taker_buy_quote_asset_volume") \
                                                  .withColumn("symbol", lit(symbol))
        
        print("Success: Trading data were extracted")
        return df_tradingdata

    else:
        print("Crypto Currency API not available. Please try later")  

def get_marketdata(trading_pair_symbols):       
    """
    Description: Creates a dataframe containing trading data for all requested cryptocurrency assets (marketdata).
    Parameters: 
        trading_pair_symbols: List containing cryptocurrency assets symbols.
    Returns: 
        None
    """
    
    # Merges dataframes containing trading details for each cryptocurrency into one dataframe containing all data (marketdata)
    series_tradingdata = []
    for trading_pair_symbol in trading_pair_symbols:
        series_tradingdata.append(get_tradingdata(trading_pair_symbol))
    
    # Execute DAG and combine collection of trading data dataframes 
    return reduce(DataFrame.unionAll, series_tradingdata)

def transform_marketdata(df_marketdata):       
    """
    Description: Process marketdata and split it into three tables following star schema approach.
                 The fact table (df_marketdata_trade) created contains trading metrics. The time table 
                 (df_marketdata_time) is descriptive and contains related trading times. The symbol 
                 table (df_marketdata_symbol) contains symbols (trading pairs). The relation between the
                 dataframes is described by the closing time (id).             
    Parameters: 
        df_marketdata: Contains all marketdata as result from data extraction
    Returns: 
        None
    """
    # @udf: Extract datetime based on the timestamp (in ms)
    get_datetime_udf = udf(lambda ts: datetime.datetime.fromtimestamp(ts / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
    
    # Convert timestamp to datetime and rename closing time to id
    # Note: The closing time acts as id as it is unique and allows to combine different marketdata dataframes 
    df_marketdata = df_marketdata.withColumn("datetime", get_datetime_udf(df_marketdata.close_time)) \
                                 .withColumnRenamed("close_time","id")
    
    # Ensure data quality dropping null values and duplicates in id. 
    df_marketdata.na.drop(subset=["id"]).dropDuplicates(["id"])
    
    # Create time table dataframe for marketdata. The id is expressed by the closing time. 
    df_marketdata_time = df_marketdata.select(["id",
                                               month("datetime").alias("month"),
                                               year("datetime").alias("year"),
                                               dayofweek("datetime").alias("weekday"),
                                               weekofyear("datetime").alias("weekofyear"),
                                               "datetime"])
    
    # Create dataframe trade table containing all revelant details.
    df_marketdata_trade = df_marketdata.select("id",
                                               "open",
                                               "close",
                                               "high",
                                               "low",
                                               "quote_asset_volume",
                                               "number_of_trades",
                                               "taker_buy_base_asset_volume",
                                               "taker_buy_quote_asset_volume")
    
    # Create dataframe trade table containing all revelant details.
    df_marketdata_symbol = df_marketdata.select("id",
                                                "symbol")
    
    print("Success: Marketdata were transformed")
    return df_marketdata_time, df_marketdata_trade, df_marketdata_symbol

## Receive all marketdata for cryptocurrency assets
trading_pair_symbols = ["BTCUSDT", "ETHUSDT", "XRPUSDT"]
df_marketdata = get_marketdata(trading_pair_symbols)

## Split the marketdata into different tables following star schema
df_marketdata_time, df_marketdata_trade, df_marketdata_symbol = transform_marketdata(df_marketdata)

Success: Call to crypto currency exchange executed
Crypto Currency API not available. Please try later
Success: Call to crypto currency exchange executed
Crypto Currency API not available. Please try later
Success: Call to crypto currency exchange executed
Crypto Currency API not available. Please try later


AttributeError: ignored

In [12]:
def setup_twitterAPI():       
    """
    Description:  Calls the twitter api wrapper and provides credentials to it.         
    Parameters: 
        None
    Returns: 
        twitterAPI: The prepared API wrapper allowing to send request to twitter api
    """
    # Provide all credentials to access the twitter API
    consumer_key = "tuopEJa4I562s5QRtCfTiZnrI"
    consumer_secret = "tGD8leV9o7aGTE5dEXx77jm8TF5W8vYMLBKRZDbz6bcjqbDIz8"
    access_token_key = "1231985298751590404-iIWijICrpkjTijmE1wn9818axnEBl1"
    access_token_secret = "Bsj5d241AFWw8n8MTprWqVSnB7m1OTDOfwP3iYTDNHuZ8"

    # Get the API from the Twitter API wrapper and provide credentials
    twitterAPI = TwitterAPI(consumer_key, 
                            consumer_secret, 
                            access_token_key, 
                            access_token_secret)
    
    print("Success: Twitter API was configured")
    return twitterAPI
    
def get_parameters():       
    """
    Description: Define all needed parameters to execute a twitter tweet search.
    Parameters: 
        None
    Returns: 
        parameters: Dictionary containing relevant search parameters
    """
    
    # The string in the search term contains a special parameter lang: which requesting only english tweets.
    search_term = "bitcoin lang:en"
    
    # Defines the twitter product being used. The "30 day" product allows to access the last 30 days.
    product = "Search Tweets"
    
    # Name of the twitter application defined.
    label = "simply_fire"
    
    #  The twitter search begins at the startDate and stops at the endDate. However, the search may stop before
    #  in case the number of allowed request is reached (30/minute or 250/month).
    startDate = "202205130000" 
    endDate = "202205140000"
    
    # The nextpage_token is necessary to enable pagination
    nextpage_token = None

    parameters = {"label": label,
                  "product": product,
                  "query": search_term,
                  "fromDate": startDate, 
                  "toDate": endDate,
                  "next": nextpage_token}
    
    print("Success: Parameters for Twitter Api were defined")
    return parameters

def get_twitterdata():       
    """
    Description: Execute get request and save received tweets a dictionary containing only releveant data from tweet. 
                 Note: The results and requests of the twitter api are strictly limited allowing 30 requests/minute and 
                 250 requests/month. In addition only 100 are shown per request (page). Therefore, the request need to be splitted 
                 (e.g. based on date) and executed several times applying next token.
                 Source:          
    Parameters: 
        None
    Returns: 
        cryptocurrency_tweets_pd: Dataframe containing all relevant cryptocurrency tweets for the desired time period
    """
    
    # Make Twitter Api available and define parameters for twitter search
    twitterAPI = setup_twitterAPI()
    parameters = get_parameters()
    
    # As the request is a nested json object containing data which are not needed
    # the relevant details are extracted and persisted in a dictionary 
    cryptocurrency_tweets_dict = {"id": [], 
                                  "date": [], 
                                  "text": [],
                                  "user": [], 
                                  "favorite_count": [], 
                                  "retweet_count": [], 
                                  "followers_count": []}

    # Execute actual twitter search based on search term considering pagination and english language
    # Filter out unnesseary tweet data and create dictionary containing relevant data
    while True:
        request = twitterAPI.request("tweets/search/%s/:%s" % (parameters["product"], parameters["label"]),
                                                              {"query": parameters["query"],
                                                               "fromDate": parameters["fromDate"],
                                                               "toDate": parameters["toDate"],
                                                               "next": parameters["next"]})
        twitter_tweets = request.json()
        if (request.status_code != 200):
            print("Error: Twitter API Call was not successful")
            request_status = False
            break
        for twitter_tweet in twitter_tweets["results"]:
            cryptocurrency_tweets_dict["id"].append(twitter_tweet["id"])
            cryptocurrency_tweets_dict["date"].append(twitter_tweet["created_at"])
            cryptocurrency_tweets_dict["text"].append(twitter_tweet["text"])
            cryptocurrency_tweets_dict["user"].append(twitter_tweet["user"]["screen_name"])
            cryptocurrency_tweets_dict["favorite_count"].append(twitter_tweet["favorite_count"])
            cryptocurrency_tweets_dict["retweet_count"].append(twitter_tweet["retweet_count"])
            cryptocurrency_tweets_dict["followers_count"].append(twitter_tweet["user"]["followers_count"])
        if "next" not in twitter_tweets:
            print("Success: Twitter API Call executed")
            request_status = True
            break
        parameters["next"] = twitter_tweets["next"]
    
    # To ease up post-processing use pandas to create dataframe containing all relevant cryptocurrency tweets
    cryptocurrency_tweets_pd = pd.DataFrame(cryptocurrency_tweets_dict)
    return cryptocurrency_tweets_pd, request_status

def persist_twitterdata(df_twitterdata, request_status):
    """
    Description: Persist the tweets in json format on the filesystem. This is necessary to decouple the from twitter api
                 due to result and call limitations. In case the Twitter Api limit is reached, no data will be persisted. 
                 In that case refer to existing data in datasource/twitter_tweets folder.
    Parameters: 
        df_twitterdata: Panda dataframe containing tweets for a specific period in the past
        request_status: In case the request was executed succesfully the status is set to True. Otherwise False. 
    Returns: 
        None
    """    
    
    if request_status == True:
        cryptocurrency_tweets_df = spark.createDataFrame(df_twitterdata)
        cryptocurrency_tweets_df.write.json(path="data/twitter_tweets", 
                                            mode="append")
        print("Success: Tweets were written into filesystem")
    else:
        print("Error: Tweets were not written into filesystem")

## Access twitter (30 day premium) api and receive tweets
df_twitterdata, request_status = get_twitterdata()

## Store twitter data on a local filesystem
persist_twitterdata(df_twitterdata, request_status)

Success: Twitter API was configured
Success: Parameters for Twitter Api were defined


Exception: ignored

In [None]:
def extract_twitterdata():       
    """
    Description: Load twitter data from external storage into a spark dataframe making it available for
                 denormalization
    Parameters: 
        None
    Returns: 
        cryptocurrency_tweets_df: Spark dataframe containing twitter tweets
    """
 
    # Ensure proper data structure by providing custom schema. Make data structure more robust
    # by ensuring that a value for id is provided
    fields = [StructField("id", LongType(), False),
              StructField("date", StringType(), True),
              StructField("text", StringType(), True),
              StructField("user", StringType(), True), 
              StructField("favorite_count", LongType(), True),
              StructField("retweet_count", LongType(), True),
              StructField("followers_count", LongType(), True)]
                 
    final_schema = StructType(fields)  
    
    # Read twitter data provided as json following the schema definition
    cryptocurrency_tweets_df = spark.read.json(path="datasource/twitter_tweets", schema=final_schema)
    print("Success: Twitter tweets were extracted")
    return cryptocurrency_tweets_df

## Process and denormalize tweets 
def transform_twitterdata(tweets_df):       
    """
    Description:              
    Parameters: 
        transform_twitterdata: Contains all cryptocurrency tweets from filesystem
    Returns: 
        None
    """  
    
    # @udf: Convert ctime format to datetime and remove UTC timezone value as utc time is applicable for the entire project
    get_time_value_sequence_datetime = udf(lambda dateTimeUtc: datetime.datetime.strptime(dateTimeUtc, "%a %b %d %H:%M:%S %z %Y") \
                                                                                .strftime("%Y-%m-%d %H:%M:%S"))
    
    # Ensure data quality dropping null values and duplicates in id. 
    tweets_df.na.drop(subset=["id"]).dropDuplicates(["id"])
    
    # Convert ctime format to datetime allowing to extract date segments within dataframe
    tweets_df = tweets_df.withColumn("datetime", get_time_value_sequence_datetime(tweets_df.date)) 
    
    # Contains time related information for each tweet splitted into time segments.
    tweets_time_df = tweets_df.select(["id",
                                       month("datetime").alias("month"),
                                       year("datetime").alias("year"),
                                       dayofweek("datetime").alias("weekday"),
                                       weekofyear("datetime").alias("weekofyear"),
                                       "datetime"])
    
    # Contains username and how many followers belong to a user
    tweets_user_df = tweets_df.select("id",
                                      "user",
                                      "followers_count")
    
    # Contains content related to an usertweet like the tweet message and how many times it was liked and retweeted.
    tweets_content_df = tweets_df.select("id",
                                         "text",
                                         "favorite_count",
                                         "retweet_count")
    
    # Return dataframes as tuple for post-processing
    print("Success: Tweet dataframes were transformed")
    return tweets_time_df, tweets_user_df, tweets_content_df

## Extract tweets from filesystem
tweets_df = extract_twitterdata()  
## Process cryptocurrency tweets and denormalize tweets into time, user and content tweets
tweets_time_df, tweets_user_df, tweets_content_df = transform_twitterdata(tweets_df)

AnalysisException: ignored