# Ingestion from a Jupyter Notebook

When working in a Jupuyter Notebook you can send data to Tinybird Data Sources using the full range of ingestion options.

This notebook walks through the options using the example of data from recent changes to Wikipedia.

**Options for ingesting data:**

1. Rest API
2. UI
3. CLI
4. High-frequency ingestion

**Example using pandas DataFrames for options 1-3.**

- create a Data Source from 5 minutes of data in `df_wiki`

- append 5 minutes of data to the Data Source from `df_wiki_new`

Based on
https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams



## Create pandas DataFrames

In [2]:
!pip install sseclient

Collecting sseclient
  Downloading sseclient-0.0.27.tar.gz (7.5 kB)
Building wheels for collected packages: sseclient
  Building wheel for sseclient (setup.py) ... [?25l[?25hdone
  Created wheel for sseclient: filename=sseclient-0.0.27-py3-none-any.whl size=5584 sha256=736b1bdd9b59e0dc27e2200fc1670e837de894768db1296727c3371a3f62ccea
  Stored in directory: /root/.cache/pip/wheels/07/67/7e/96edf627ac746de1a5c5cbb8d59ed960f033b8352dc12c545d
Successfully built sseclient
Installing collected packages: sseclient
Successfully installed sseclient-0.0.27


In [3]:
import json
import time
import pandas as pd

from sseclient import SSEClient as EventSource
from google.colab import files

In [4]:
def create_df_wiki(url='https://stream.wikimedia.org/v2/stream/recentchange', n=5):
  df_wiki = pd.DataFrame()
  t_end = time.time() + 60 * n
  change = {'timestamp': time.time()}
  for event in EventSource(url):
    if change['timestamp'] > t_end:
          break
    elif event.event == 'message':
          try:
              change = json.loads(event.data)
          except ValueError:
              pass
          else:
            if change['type']!='log':
              df=pd.DataFrame.from_dict(change)
              df_wiki=df_wiki.append(df[df.index=='domain'])
  return df_wiki

DataFrame of n minutes of data to create Data Source

In [5]:
df_wiki = create_df_wiki(n=5)
df_wiki.drop(columns=['$schema','length','revision'], inplace=True)

In [6]:
df_wiki.info()

<class 'pandas.core.frame.DataFrame'>
Index: 7147 entries, domain to domain
Data columns (total 16 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   meta                7147 non-null   object
 1   id                  7147 non-null   int64 
 2   type                7147 non-null   object
 3   namespace           7147 non-null   int64 
 4   title               7147 non-null   object
 5   comment             7147 non-null   object
 6   timestamp           7147 non-null   int64 
 7   user                7147 non-null   object
 8   bot                 7147 non-null   bool  
 9   server_url          7147 non-null   object
 10  server_name         7147 non-null   object
 11  server_script_path  7147 non-null   object
 12  wiki                7147 non-null   object
 13  parsedcomment       7147 non-null   object
 14  minor               4501 non-null   object
 15  patrolled           2861 non-null   object
dtypes: bool(1), int64(3), 

DataFrame of 5 minutes of data to append to Data Source

In [7]:
df_wiki_new = create_df_wiki(n=5)
df_wiki_new.drop(columns=['$schema','length','revision'], inplace=True)

## Option 1: Ingest to Tinybird using the Rest API

In [8]:
if token == '':
   print("Get your token from your Tinybird workspace.")

### Create Data Source from a CSV File

Column names read from first row, column data types inferred.

In [9]:
header = f'"Authorization: Bearer {token}"'

name = 'wiki_api_csv'
mode = 'create'
url = 'https://api.tinybird.co/v0/datasources'
endpoint = f'"{url}?mode={mode}&name={name}"'

filename = name + 'csv'
df_wiki.to_csv(filename, index=False)

!curl -H $header -X POST $endpoint -F csv=@{filename}

{
    "import_id": "d9179b24-4e44-4212-a56e-66389a3033de",
    "datasource": {
        "id": "t_b2260a6bf31a47c68b525b72f6c53891",
        "name": "wiki_api_csv",
        "cluster": null,
        "tags": {},
        "created_at": "2022-02-10 17:36:52.762554",
        "updated_at": "2022-02-10 17:36:53.155813",
        "replicated": false,
        "version": 0,
        "project": null,
        "headers": {
            "dialect": {
                "header": "['meta', 'id', 'type', 'namespace', 'title', 'comment', 'timestamp', 'user', 'bot', 'server_url', 'server_name', 'server_script_path', 'wiki', 'parsedcomment', 'minor', 'patrolled']",
                "header_hash": -6378930830185933471
            }
        },
        "shared_with": [],
        "engine": {
            "engine": "MergeTree",
            "partition_key": "substring(meta, 1, 1)",
            "sorting_key": "meta, cityHash64(title)",
            "sampling_key": "cityHash64(title)"
        },
        "used_by": [],
      

### Append to Data Source from a CSV File

In [10]:
filename = 'wiki_new_api_csv.csv'
df_wiki_new.to_csv(filename, index=False)

mode = 'append'
endpoint = f'"{url}?mode={mode}&name={name}"' 

!curl -H $header -X POST $endpoint -F csv=@{filename}

{
    "import_id": "3e249531-a4bd-485b-80c1-684782e8d3d8",
    "datasource": {
        "id": "t_b2260a6bf31a47c68b525b72f6c53891",
        "name": "wiki_api_csv",
        "cluster": null,
        "tags": {},
        "created_at": "2022-02-10 17:36:52.762554",
        "updated_at": "2022-02-10 17:36:58.186064",
        "replicated": false,
        "version": 0,
        "project": null,
        "headers": {
            "dialect": {
                "header": "['meta', 'id', 'type', 'namespace', 'title', 'comment', 'timestamp', 'user', 'bot', 'server_url', 'server_name', 'server_script_path', 'wiki', 'parsedcomment', 'minor', 'patrolled']",
                "header_hash": -6378930830185933471
            },
            "cached_delimiter": ","
        },
        "shared_with": [],
        "engine": {
            "engine": "MergeTree",
            "partition_key": "substring(meta, 1, 1)",
            "sorting_key": "meta, cityHash64(title)",
            "sampling_key": "cityHash64(title)"
   

### Create Data Source from Data in Memory
based on https://gist.github.com/alrocar/9b1b860cf74ac6f2ad115c3cb2945e93

In [11]:
import csv
import requests

from io import StringIO
from requests.adapters import HTTPAdapter

from urllib3.util.retry import Retry
from urllib.parse import urlencode

In [12]:
def ingest_from_array(rows,
                      datasource, 
                      token, mode='append', 
                      endpoint='https://api.tinybird.co'):
  
  url = f'{endpoint}/v0/datasources?mode={mode}&name={datasource}'

  retry = Retry(total=5, backoff_factor=0.2)
  adapter = HTTPAdapter(max_retries=retry)
  _session = requests.Session()
  _session.mount('http://', adapter)
  _session.mount('https://', adapter)

  csv_chunk = StringIO()
  writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)

  records = 0
  for row in rows:
    writer.writerow(row)
    records += 1

    if len(rows) == records:
        data = csv_chunk.getvalue()
        headers = {
            'Authorization': f'Bearer {token}',
            'X-TB-Client': 'pltx-0.1',
        }

        ok = False
        try:
            response = _session.post(url, headers=headers, files=dict(csv=data))
            result = response.json()

            ok = response.status_code < 400
            if ok:
                csv_chunk = StringIO()
                writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
                print(f"Flushed {len(data)} bytes, datasource={datasource}, response={response.status_code}")
                print(f"Result id={result.get('import_id', None)}, error={result.get('error', False)}")
        except Exception as e:
            print(e)

  print('Done')

Column names read from 'rows', column data types interpreted from 'rows'.

In [13]:
rows= df_wiki.values.tolist()
# put column names in 1st row
rows.insert(0, df_wiki.columns.tolist())

datasource = 'wiki_api_mem'
mode = 'create'
endpoint = 'https://api.tinybird.co'

ingest_from_array(rows, datasource, token, mode, endpoint)

Flushed 3228107 bytes, datasource=wiki_api_mem, response=200
Result id=1f64e26b-da70-4cc2-94dc-3b40f1586377, error=False
Done


### Append to Data Source from Data in Memory

In [14]:
mode = 'append'
rows= df_wiki_new.values.tolist()

ingest_from_array(rows, datasource, token, mode, endpoint)

Flushed 3542886 bytes, datasource=wiki_api_mem, response=200
Result id=0fd3d8ce-db02-4036-94f1-2f9465276000, error=False
Done


## Option 2: Download Local File then ingest to Tinybird through the UI

- CSV
- NDJSON

The column names and types can be changed in the preview in the UI, for example, the column `type` can be changed to `LowCardinality(String)`.

### Format CSV

In [15]:
df_wiki.to_csv("wiki_ui_csv.csv", index=False)
files.download('wiki_ui_csv.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

### Format NDJSON

In [16]:
df_wiki.to_json("wiki_ui_ndjson.ndjson", orient="records", lines=True, force_ascii=0)
files.download("wiki_ui_ndjson.ndjson")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Option 3: Ingest to Tinybird from the CLI
- CSV
- NDJSON

In [17]:
!pip install tinybird-cli

Collecting tinybird-cli
  Downloading tinybird_cli-1.0.0b95-py3-none-any.whl (84 kB)
[?25l[K     |███▉                            | 10 kB 14.9 MB/s eta 0:00:01[K     |███████▊                        | 20 kB 11.4 MB/s eta 0:00:01[K     |███████████▋                    | 30 kB 9.6 MB/s eta 0:00:01[K     |███████████████▌                | 40 kB 8.5 MB/s eta 0:00:01[K     |███████████████████▍            | 51 kB 5.2 MB/s eta 0:00:01[K     |███████████████████████▎        | 61 kB 5.5 MB/s eta 0:00:01[K     |███████████████████████████▏    | 71 kB 5.3 MB/s eta 0:00:01[K     |███████████████████████████████ | 81 kB 6.0 MB/s eta 0:00:01[K     |████████████████████████████████| 84 kB 2.0 MB/s 
Collecting requests==2.25.1
  Downloading requests-2.25.1-py2.py3-none-any.whl (61 kB)
[K     |████████████████████████████████| 61 kB 6.6 MB/s 
[?25hCollecting humanfriendly==8.2
  Downloading humanfriendly-8.2-py2.py3-none-any.whl (86 kB)
[K     |████████████████████████████████| 8

In [18]:
if token == '':
   print("Get your token from your Tinybird workspace.")

In [19]:
def write_text_to_file(filename, text):
  with open(filename, 'w') as f: f.write(text)

### Format CSV

In [20]:
df_wiki.to_csv("wiki_cli_csv.csv", index=False)

The schema for the Data Source can be generated from the CSV file or written from code. 

In [21]:
# generate the file wiki_cli_csv.datasource
!tb --token=$token datasource generate wiki_cli_csv.csv

[92m** Generated wiki_cli_csv.datasource
** => Create it on the server running: $ tb push wiki_cli_csv.datasource
** => Append data using: $ tb datasource append wiki_cli_csv wiki_cli_csv.csv
[0m


In [22]:
# or write the file wiki_cli_csv.datasource with data types, sorting key etc.
filename = 'wiki_cli_csv.datasource'
text='''
SCHEMA >
    `meta` LowCardinality(String),
    `id` Int64,
    `type` String,
    `namespace` Int16,
    `title` String,
    `comment` Nullable(String),
    `timestamp` Int64,
    `user` String,
    `bot` String,
    `minor` Nullable(String),
    `patrolled` Nullable(String),
    `server_url` LowCardinality(String),
    `server_name` LowCardinality(String),
    `server_script_path` String,
    `wiki` LowCardinality(String),
    `parsedcomment` Nullable(String)

ENGINE "MergeTree"
ENGINE_SORTING_KEY "timestamp"
'''

write_text_to_file(filename, text)

In [23]:
!tb --token=$token push wiki_cli_csv.datasource
!tb --token=$token datasource append wiki_cli_csv wiki_cli_csv.csv

[0m** Processing wiki_cli_csv.datasource[0m
[0m** Building dependencies[0m
[0m** Running wiki_cli_csv [0m
[92m** 'wiki_cli_csv' created[0m
[0m** Not pushing fixtures[0m
[0m** 🥚 starting import process[0m
[92m** 🐥 done[0m
[92m** Appended 7017 new rows[0m
[92m** Total rows in wiki_cli_csv: 7147[0m
[92m** Data appended to Data Source 'wiki_cli_csv' successfully![0m
[0m** Data pushed to wiki_cli_csv[0m


### Format NDJSON

In [24]:
df_wiki.to_json("wiki_cli_ndjson.ndjson", orient="records", lines=True, force_ascii=0)

The schema for the Data Source can be generated from the NDJSON file or written from code. 

In [25]:
# generate the file wiki_cli_ndjson.datasource
!tb --token=$token datasource generate wiki_cli_ndjson.ndjson

[92m** Generated wiki_cli_ndjson.datasource
** => Create it on the server running: $ tb push wiki_cli_ndjson.datasource
** => Append data using: $ tb datasource append wiki_cli_ndjson wiki_cli_ndjson.ndjson
[0m


In [26]:
# or write the file wiki_cli_ndjson.datasource with data types, sorting key etc.
filename = 'wiki_cli_ndjson.datasource'
text='''
SCHEMA >

    bot UInt8 `json:$.bot`,
    comment Nullable(String) `json:$.comment`,
    id Int64 `json:$.id`,
    meta LowCardinality(String) `json:$.meta`,
    minor Nullable(UInt8) `json:$.minor`,
    namespace Int16 `json:$.namespace`,
    parsedcomment Nullable(String) `json:$.parsedcomment`,
    patrolled Nullable(UInt8) `json:$.patrolled`,
    server_name String `json:$.server_name`,
    server_script_path String `json:$.server_script_path`,
    server_url String `json:$.server_url`,
    timestamp Int64 `json:$.timestamp`,
    title String `json:$.title`,
    type String `json:$.type`,
    user String `json:$.user`,
    wiki LowCardinality(String) `json:$.wiki`
    
ENGINE "MergeTree"
ENGINE_SORTING_KEY "timestamp"
'''

write_text_to_file(filename, text)

In [27]:
!tb --token=$token push wiki_cli_ndjson.datasource
!tb --token=$token datasource append wiki_cli_ndjson wiki_cli_ndjson.ndjson

[0m** Processing wiki_cli_ndjson.datasource[0m
[0m** Building dependencies[0m
[0m** Running wiki_cli_ndjson [0m
[92m** 'wiki_cli_ndjson' created[0m
[0m** Not pushing fixtures[0m
[0m** 🥚 starting import process[0m
[92m** 🐥 done[0m
[92m** Appended 0 new rows[0m
[92m** Total rows in wiki_cli_ndjson: 7147[0m
[92m** Data appended to Data Source 'wiki_cli_ndjson' successfully![0m
[0m** Data pushed to wiki_cli_ndjson[0m


## Option 4: Stream to Tinybird using High-Frequency Ingestion
Here events are streamed directly to the Data Source from the Wikipedia stream using [high-frequency ingestion](https://guides.tinybird.co/guide/high-frequency-ingestion). The data is not first written to a pandas DataFrame.

With `mode='create'` the data types are inferred. To avoid rows going into quarantine, a few more columns  need to be `Nullable` than inferred. Directly defining the schema after exploring the automatically created Data Source in the UI solves this issue. 

In [28]:
!pip install sseclient
!pip install tinybird-cli -q -U



In [29]:
import json
import requests
import time

import pandas as pd
from sseclient import SSEClient as EventSource

In [30]:
if token == '':
   print("Get your token from your Tinybird workspace.")

In [31]:
def write_text_to_file(filename, text):
  with open(filename, 'w') as f: f.write(text)

In [32]:
filename = 'wiki_hfi.datasource'
text='''
SCHEMA >
    `DOLLAR_SIGN_schema` String `json:$.['$schema']`,
    `bot` UInt8 `json:$.bot`,
    `comment` String `json:$.comment`,
    `id` Int64 `json:$.id`,
    `length_new` Nullable(Int32) `json:$.length.new`,
    `length_old` Nullable(Int32) `json:$.length.old`,
    `meta_domain` String `json:$.meta.domain`,
    `meta_dt` DateTime `json:$.meta.dt`,
    `meta_id` String `json:$.meta.id`,
    `meta_offset` Int64 `json:$.meta.offset`,
    `meta_partition` Int16 `json:$.meta.partition`,
    `meta_request_id` String `json:$.meta.request_id`,
    `meta_stream` String `json:$.meta.stream`,
    `meta_topic` String `json:$.meta.topic`,
    `meta_uri` String `json:$.meta.uri`,
    `minor` Nullable(UInt8) `json:$.minor`,
    `namespace` Int16 `json:$.namespace`,
    `parsedcomment` String `json:$.parsedcomment`,
    `patrolled` Nullable(UInt8) `json:$.patrolled`,
    `revision_new` Nullable(Int64) `json:$.revision.new`,
    `revision_old` Nullable(Int64) `json:$.revision.old`,
    `server_name` String `json:$.server_name`,
    `server_script_path` String `json:$.server_script_path`,
    `server_url` String `json:$.server_url`,
    `timestamp` Int64 `json:$.timestamp`,
    `title` String `json:$.title`,
    `type` String `json:$.type`,
    `user` String `json:$.user`,
    `wiki` String `json:$.wiki`

ENGINE "MergeTree"
ENGINE_SORTING_KEY "timestamp"
'''

write_text_to_file(filename, text)
!tb --token=$token push wiki_hfi.datasource

[0m** Processing wiki_hfi.datasource[0m
[0m** Building dependencies[0m
[0m** Running wiki_hfi [0m
[92m** 'wiki_hfi' created[0m
[0m** Not pushing fixtures[0m


In [34]:
url = 'https://api.tinybird.co/v0/events'
mode = 'append'
datasource = 'wiki_hfi'
n = 1 # minutes of data from stream

params = {
        'mode': mode,
        'name': datasource,
        'token': token
        }
t_end = time.time() + n*60
change = {'timestamp': time.time()}
print('Start time:',pd.Timestamp(change['timestamp'], unit='s'))
for event in EventSource('https://stream.wikimedia.org/v2/stream/recentchange'):
      if change['timestamp'] > t_end:
        break
      elif event.event == 'message':
        try:
              change = json.loads(event.data)
        except ValueError:
              pass
        else:
            if change['type']!='log':
              r = requests.post(url, 
                                params=params, 
                                data=json.dumps(change))
print('Final timestamp:', pd.Timestamp(change['timestamp'], unit='s'))

Start time: 2022-02-10 17:37:45.451096535
Final timestamp: 2022-02-10 17:38:46
