In [21]:
#!pip install pycoingecko
#!pip install kafka-python
#!conda install -c conda-forge kafka-python

Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /Users/Spencer/opt/anaconda3

  added / updated specs:
    - kafka-python


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    kafka-python-2.0.2         |     pyh9f0ad1d_0         163 KB  conda-forge
    ------------------------------------------------------------
                                           Total:         163 KB

The following NEW packages will be INSTALLED:

  kafka-python       conda-forge/noarch::kafka-python-2.0.2-pyh9f0ad1d_0


Proceed ([y]/n)? ^C

CondaSystemExit: 
Operation aborted.  Exiting.



In [33]:
import pandas as pd
import json
from pycoingecko import CoinGeckoAPI
from kafka import KafkaConsumer, KafkaProducer

In [46]:
def get_crypto_data():
    
    crypto_list = [
        "bitcoin",
        "ethereum",
        "ripple",
        "bitcoin-cash",
        "cardano",
        "bitcoin-cash-sv",
        "litecoin",
        "chainlink",
        "binancecoin",
        "eos",
        "tron",
    ]
    
    #connect to API
    api = CoinGeckoAPI()
    timePeriod = 2 #data backed to about 6/1/2020
    
    #connect to kafka
    producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0,11,5))
    #topic_name='crypto_data'

    #getting crypto prices 
    data = {}
    for coin in crypto_list:
        try:
            nested_lists = api.get_coin_market_chart_by_id(id=coin, vs_currency="usd", days=timePeriod)["prices"]
            data[coin] = {}
            #groups time and price to each coin
            data[coin]["timestamps"], data[coin]["values"] = zip(*nested_lists)

        except Exception as e:
            print(e)
            print("coin: " + coin)

    frame_list = [
        pd.DataFrame(data[coin]["values"], index=data[coin]["timestamps"], columns=[coin])
        for coin in crypto_list
        if coin in data
    ]
    
    #formatting dataset
    df_crypto = pd.concat(frame_list, axis=1).sort_index()
    df_crypto["datetime"] = pd.to_datetime(df_crypto.index, unit="ms")
    df_crypto["date"] = df_crypto["datetime"].dt.date
    df_crypto["time"] = df_crypto["datetime"].dt.time
    df_crypto = df_crypto.melt(id_vars=["datetime", "date", "time"], var_name="currency_name", ignore_index=True)
    df_crypto.dropna(inplace=True)
    
    #adding symbol to dataset
    curr = dict(
        [
            ("bitcoin", "BTC"),
            ("ethereum", "ETH"),
            ("ripple", "XRP"),
            ("bitcoin-cash", "BCH"),
            ("cardano", "ADA"),
            ("bitcoin-cash-sv", "BSV"),
            ("litecoin", "LTC"),
            ("chainlink", "LINK"),
            ("binancecoin", "BNB"),
            ("tron", "TRON"),
            ("eos", "EOS"),
            ("tether", "USDT"),
        ]
    )

    df_crypto["coin_symbol"] = df_crypto["currency_name"].apply(lambda c: curr[c])
    
    #convert df to string
    data = df_crypto.to_string()
    print(data)

    #sending to formatted dataset to kafka
    producer.send('crypto_data', json.dumps(data).encode('utf-8'))
    

In [47]:
get_crypto_data()

                    datetime        date             time    currency_name         value coin_symbol
8    2022-01-04 20:03:57.863  2022-01-04  20:03:57.863000          bitcoin  45998.577746         BTC
16   2022-01-04 21:01:31.596  2022-01-04  21:01:31.596000          bitcoin  46346.543890         BTC
27   2022-01-04 22:02:04.982  2022-01-04  22:02:04.982000          bitcoin  46294.005984         BTC
38   2022-01-04 23:02:06.283  2022-01-04  23:02:06.283000          bitcoin  46164.781388         BTC
46   2022-01-05 00:01:01.450  2022-01-05  00:01:01.450000          bitcoin  45938.024272         BTC
63   2022-01-05 01:07:20.565  2022-01-05  01:07:20.565000          bitcoin  46309.780484         BTC
71   2022-01-05 02:03:15.428  2022-01-05  02:03:15.428000          bitcoin  46243.706712         BTC
85   2022-01-05 03:06:04.396  2022-01-05  03:06:04.396000          bitcoin  46370.156714         BTC
91   2022-01-05 04:00:17.638  2022-01-05  04:00:17.638000          bitcoin  46519.152928   