# **Chapter 5**
## *Persisting Time Series Data to Databases*, 

This chapter builds on Chapter 4, Persisting Time Series Data to Files, focusing on writing data for scale. This covers different techniques for writing data to relational and non-relational database systems like those discussed in Chapter 2, Reading Time Series Data from Files, including on-premises and cloud services

The goal of this chapter is to give you first-hand experience working with different methods to connect to these database systems to persist your time series DataFrame.  

Here is the list of the recipes that we will cover in this chapter:
* Writing time series data to a relational database (PostgreSQL and MySQL)
* Writing time series data to MongoDB
* Writing time series data to InfluxDB
* Writing time series data to Snowflake

# Recipe 1: Writing Data to Relational Databases

In the *Reading data from relational database* recipe in **Chapter 3**, *Reading Time Series Data from Databases*, you installed `sqlalchemy` and `psycopg2` for the read engine. For this recipe, you will be using these two libraries again.  

You will also use the `pandas-datareader` library to pull stock data.  

* To install the libraries using `Conda`, run the following:

```
conda install sqlalchemy psycopg2 pandas-datareader -y
```

* To install the libraries using `pip`, run the following:

```
pip install sqlalchemy psycopg2 pandas-datareader
```

## Writing Data to PostgreSQL

In [1]:
import pandas as pd
from sqlalchemy import create_engine
import pandas_datareader.data as web

engine = create_engine("postgresql://postgres:password@localhost:5432/postgres")

In [2]:
engine

Engine(postgresql://postgres:***@localhost:5432/postgres)

In [3]:
amzn_df_2020 = web.get_data_yahoo('AMZN', 
                                  start='2020-01-01', 
                                  end='2020-12-31')

In [4]:
amzn_df_2020.head()

Unnamed: 0_level_0,High,Low,Open,Close,Volume,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2020-01-02,94.900497,93.207497,93.75,94.900497,80580000,94.900497
2020-01-03,94.309998,93.224998,93.224998,93.748497,75288000,93.748497
2020-01-06,95.184502,93.0,93.0,95.143997,81236000,95.143997
2020-01-07,95.694504,94.601997,95.224998,95.343002,80898000,95.343002
2020-01-08,95.550003,94.321999,94.902,94.598503,70160000,94.598503


In [5]:
amzn_df_2020.shape

(253, 6)

In [6]:
amzn_df_2020.to_sql('amazon',
                    engine,
                    if_exists='replace')

253

In [7]:
query = '''
SELECT EXISTS (
   SELECT FROM information_schema.tables 
   WHERE  table_schema = 'public'
   AND    table_name   = 'amazon'
   );
'''
engine.execute(query).fetchone()

(True,)

In [8]:
query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(253,)

In [9]:
amzn_df_2021 = web.get_data_yahoo('AMZN', 
                                start='2021-01-01', 
                                end='2021-06-01')

In [10]:
amzn_df_2021.to_sql('amazon',
                    engine,
                    if_exists='append')

103

In [11]:
amzn_df_2021.shape

(103, 6)

In [12]:
query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(356,)

## There is more

### Writing to MySQL 

In [13]:
engine = create_engine("mysql+pymysql://root:password@localhost:3306/stocks")

# uncomment if you need to pull the data again
# amzn_df_2020 = web.get_data_yahoo('AMZN', 
#                                   start='2020-01-01', 
#                                   end='2020-12-31')

amzn_df_2020.to_sql('amazon',
                    engine,
                    if_exists='replace')

query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(253,)

In [14]:
query = '''
select * from amazon;
'''
engine.execute(query).fetchone()

(datetime.datetime(2020, 1, 2, 0, 0), 94.90049743652344, 93.2074966430664, 93.75, 94.90049743652344, 80580000, 94.90049743652344)

In [15]:
amzn_df_2021.to_sql('amazon',
                    engine,
                    if_exists='append')

query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(356,)

# Recipe 2: Storing Data to MongoDB

In the *Reading data from a document database* recipe in **Chapter 3**, *Reading Time Series Data from Databases*, we installed `pymongo`. For this recipe, you will be using that same
library again.

* To install using Conda, run the following:

```
conda install -c anaconda pymongo -y
```

* To install using pip, run the following:

```
python -m pip install pymongo
```

In [16]:
import pandas as pd
from pymongo import MongoClient

In [17]:
client = MongoClient('mongodb://localhost:27017')

In [18]:
db = client['stocks']
collection = db['amazon']

In [19]:
amzn_df_2020

Unnamed: 0_level_0,High,Low,Open,Close,Volume,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2020-01-02,94.900497,93.207497,93.750000,94.900497,80580000,94.900497
2020-01-03,94.309998,93.224998,93.224998,93.748497,75288000,93.748497
2020-01-06,95.184502,93.000000,93.000000,95.143997,81236000,95.143997
2020-01-07,95.694504,94.601997,95.224998,95.343002,80898000,95.343002
2020-01-08,95.550003,94.321999,94.902000,94.598503,70160000,94.598503
...,...,...,...,...,...,...
2020-12-24,160.100006,158.449997,159.695007,158.634506,29038000,158.634506
2020-12-28,165.199997,158.634506,159.699997,164.197998,113736000,164.197998
2020-12-29,167.532501,164.061005,165.496994,166.100006,97458000,166.100006
2020-12-30,167.104996,164.123505,167.050003,164.292496,64186000,164.292496


In [20]:
amzn_records = amzn_df_2020.reset_index().to_dict(orient='records')

In [21]:
len(amzn_records)

253

In [22]:
#amzn_df_2020.reset_index().to_dict()

In [23]:
amzn_records[0:1]

[{'Date': Timestamp('2020-01-02 00:00:00'),
  'High': 94.90049743652344,
  'Low': 93.2074966430664,
  'Open': 93.75,
  'Close': 94.90049743652344,
  'Volume': 80580000,
  'Adj Close': 94.90049743652344}]

In [24]:
collection.insert_many(amzn_records)

<pymongo.results.InsertManyResult at 0x7f8e7cfd1f80>

In [25]:
client.list_database_names()

['admin', 'config', 'local', 'stocks']

In [26]:
db.list_collection_names()

['amazon']

In [27]:
collection.find_one()

{'_id': ObjectId('62a1ad56ccf4b41da835bf39'),
 'Date': datetime.datetime(2020, 1, 2, 0, 0),
 'High': 94.90049743652344,
 'Low': 93.2074966430664,
 'Open': 93.75,
 'Close': 94.90049743652344,
 'Volume': 80580000,
 'Adj Close': 94.90049743652344}

In [28]:
# filter documents that are greater than August 1, 2020
# and retrieve the first record
import datetime
collection.find_one({'Date': {'$gt': datetime.datetime(2020, 8,1)}})

{'_id': ObjectId('62a1ad56ccf4b41da835bfcc'),
 'Date': datetime.datetime(2020, 8, 3, 0, 0),
 'High': 159.1999969482422,
 'Low': 155.1999969482422,
 'Open': 159.02549743652344,
 'Close': 155.59449768066406,
 'Volume': 101494000,
 'Adj Close': 155.59449768066406}

In [29]:
collection.count_documents({})

253

### InsertOneResult

In [30]:
one_record = amzn_df_2021.reset_index().iloc[0].to_dict()
one_record

{'Date': Timestamp('2021-01-04 00:00:00'),
 'High': 163.60000610351562,
 'Low': 157.2010040283203,
 'Open': 163.5,
 'Close': 159.3314971923828,
 'Volume': 88228000,
 'Adj Close': 159.3314971923828}

In [31]:
result_id = collection.insert_one(one_record)

In [32]:
result_id

<pymongo.results.InsertOneResult at 0x7f8e7e740c40>

In [33]:
result_id.inserted_id

ObjectId('62a1ad5fccf4b41da835c036')

In [34]:
list(collection.find({'Date': {'$gt': datetime.datetime(2020, 8,1)}}, {'Close': 1 }))[0:10]

[{'_id': ObjectId('62a1ad56ccf4b41da835bfcc'), 'Close': 155.59449768066406},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfcd'), 'Close': 156.94149780273438},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfce'), 'Close': 160.25149536132812},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfcf'), 'Close': 161.25},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfd0'), 'Close': 158.3730010986328},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfd1'), 'Close': 157.4080047607422},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfd2'), 'Close': 154.0334930419922},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfd3'), 'Close': 158.11199951171875},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfd4'), 'Close': 158.05099487304688},
 {'_id': ObjectId('62a1ad56ccf4b41da835bfd5'), 'Close': 157.4010009765625}]

## There is more

### MongoDB Bucketing
* Bucketing strategy page 135

In [35]:
bucket = db['stocks_bucket']
amzn_df_2020['month'] = amzn_df_2020.index.month

In [36]:
for month in amzn_df_2020.index.month.unique():
    record = {}
    record['month'] = month
    record['symbol'] = 'AMZN'
    record['price'] = list(amzn_df_2020[amzn_df_2020['month'] == month]['Close'].values)
    bucket.insert_many([record])

In [37]:
bucket.count_documents({})

12

In [38]:
bucket.find_one({'month': 6})

{'_id': ObjectId('62a1ad66ccf4b41da835c03c'),
 'month': 6,
 'symbol': 'AMZN',
 'price': [123.552001953125,
  123.62049865722656,
  123.91999816894531,
  123.02999877929688,
  124.1500015258789,
  126.2030029296875,
  130.04299926757812,
  132.37249755859375,
  127.89800262451172,
  127.2509994506836,
  128.63400268554688,
  130.7635040283203,
  132.0489959716797,
  132.69900512695312,
  133.75050354003906,
  135.6909942626953,
  138.2205047607422,
  136.72000122070312,
  137.72900390625,
  134.64349365234375,
  134.0189971923828,
  137.9409942626953]}

### MongoDB Time Series Collection
* Introduction to "Time Series Collection"

> "As of MongoDB 5.0, the database natively supports time series data by creating a special collection type called time series collection" , Chapter 5 Page 136

In [39]:
ts = db.create_collection(name = "stocks_ts", 
                         capped =  False,
                         timeseries = {"timeField": "date", 
                                       "metaField": "metadata"})

In [40]:
[i for i in db.list_collections() if i['name'] =='stocks_ts']

[{'name': 'stocks_ts',
  'type': 'timeseries',
  'options': {'timeseries': {'timeField': 'date',
    'metaField': 'metadata',
    'granularity': 'seconds',
    'bucketMaxSpanSeconds': 3600}},
  'info': {'readOnly': False}}]

In [41]:
cols = ['Close']
records = []
for month in amzn_df_2020[cols].iterrows():
    records.append(
        {'metadata': 
                 {'ticker': 'AMZN', 'type': 'close'},
         'date': month[0],
         'price': month[1]['Close']})

In [42]:
records[0:1]

[{'metadata': {'ticker': 'AMZN', 'type': 'close'},
  'date': Timestamp('2020-01-02 00:00:00'),
  'price': 94.90049743652344}]

In [43]:
ts.insert_many(records)

<pymongo.results.InsertManyResult at 0x7f8e7d1aff00>

In [44]:
ts.find_one({})

{'date': datetime.datetime(2020, 1, 2, 0, 0),
 'metadata': {'ticker': 'AMZN', 'type': 'close'},
 'price': 94.90049743652344,
 '_id': ObjectId('62a1ad6eccf4b41da835c043')}

# Recipe 3: Storing Data to Time Series Database (InfluxDB)

You will be using the **ExtraSensory** dataset, a mobile sensory dataset made available by the University of California, San Diego, which you can download here: http://
extrasensory.ucsd.edu/.  

There are 278 columns in the dataset. You will be using two of these columns to demonstrate how to write to InfluxDB. You will be using the timestamp (date ranges from 2015-07-23 to 2016-06-02, covering 152 days) and the watch accelerometer reading (measured in milli G-forces or milli-G).

Before you can interact with InfluxDB using Python, you will need to install the InfluxDB Python library. We will be working with InfluxDB 2.X, so make sure you are installing
`influxdb-client` 1.21.0 (and not influxdb-python, which supports InfluxDB up to 1.8x).

* You can install the library with `pip` by running the following command:
```
pip install 'influxdb-client[extra]'
```

In [45]:
from influxdb_client import InfluxDBClient, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd
from  pathlib import Path

In [46]:
import influxdb_client
influxdb_client.__version__

'1.29.1'

In [47]:
path = Path('../../datasets/Ch5/ExtraSensory/')

In [48]:
file = '0A986513-7828-4D53-AA1F-E02D6DF9561B.features_labels.csv.gz'

In [49]:
columns = ['timestamp',
           'watch_acceleration:magnitude_stats:mean']

df = pd.read_csv(path.joinpath(file),
                usecols=columns)
df = df.fillna(method='backfill')
df.columns = ['timestamp','acc']
df.shape


(3960, 2)

In [50]:
df.head()

Unnamed: 0,timestamp,acc
0,1449601597,995.369977
1,1449601657,995.369977
2,1449601717,995.369977
3,1449601777,996.406005
4,1449601855,1034.180063


In [51]:
df['timestamp'] = pd.to_datetime(df['timestamp'],
                                  origin='unix',
                                  unit='s',
                                  utc=True)

In [52]:
df.head()

Unnamed: 0,timestamp,acc
0,2015-12-08 19:06:37+00:00,995.369977
1,2015-12-08 19:07:37+00:00,995.369977
2,2015-12-08 19:08:37+00:00,995.369977
3,2015-12-08 19:09:37+00:00,996.406005
4,2015-12-08 19:10:55+00:00,1034.180063


In [53]:
df.set_index('timestamp', inplace=True)

In [54]:
df.head()

Unnamed: 0_level_0,acc
timestamp,Unnamed: 1_level_1
2015-12-08 19:06:37+00:00,995.369977
2015-12-08 19:07:37+00:00,995.369977
2015-12-08 19:08:37+00:00,995.369977
2015-12-08 19:09:37+00:00,996.406005
2015-12-08 19:10:55+00:00,1034.180063


In [55]:
bucket = "sensor"
org = "my-org"
token = "WXT1Hkn-Hg3FGvKChg4UQ2IW2c2_zprqbj63A1GntGVVQIZ2wZP8egDSD91MH_56sM8LbheJ1WZjM1iNI_60NQ=="
client = InfluxDBClient(url="http://localhost:8086", token=token)

In [56]:
writer = client.write_api(WriteOptions(SYNCHRONOUS,
                     batch_size=500,
                     max_retries=5_000))

writer.write(bucket=bucket,
             org=org,
             record=df,
             write_precision='ns',
             data_frame_measurement_name='acc',
             data_frame_tag_columns=[])

In [57]:
query = '''
from(bucket: "sensor")
|> range(start: 2015-12-08)
'''

result = client.query_api()

influx_df = result.query_data_frame(
    org=org,
    query=query,
    data_frame_index='_time')

In [58]:
influx_df.shape

(3960, 7)

In [59]:
influx_df.columns

Index(['result', 'table', '_start', '_stop', '_value', '_field',
       '_measurement'],
      dtype='object')

In [60]:
influx_df.head()

Unnamed: 0_level_0,result,table,_start,_stop,_value,_field,_measurement
_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2015-12-08 19:06:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2022-06-09 08:21:14.598248+00:00,995.369977,acc,acc
2015-12-08 19:07:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2022-06-09 08:21:14.598248+00:00,995.369977,acc,acc
2015-12-08 19:08:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2022-06-09 08:21:14.598248+00:00,995.369977,acc,acc
2015-12-08 19:09:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2022-06-09 08:21:14.598248+00:00,996.406005,acc,acc
2015-12-08 19:10:55+00:00,_result,0,2015-12-08 00:00:00+00:00,2022-06-09 08:21:14.598248+00:00,1034.180063,acc,acc


In [61]:
influx_df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 3960 entries, 2015-12-08 19:06:37+00:00 to 2015-12-11 18:48:27+00:00
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype                  
---  ------        --------------  -----                  
 0   result        3960 non-null   object                 
 1   table         3960 non-null   int64                  
 2   _start        3960 non-null   datetime64[ns, tzutc()]
 3   _stop         3960 non-null   datetime64[ns, tzutc()]
 4   _value        3960 non-null   float64                
 5   _field        3960 non-null   object                 
 6   _measurement  3960 non-null   object                 
dtypes: datetime64[ns, tzutc()](2), float64(1), int64(1), object(3)
memory usage: 247.5+ KB


In [62]:
writer

<influxdb_client.client.write_api.WriteApi at 0x7f8e7d470940>

In [63]:
writer.close()

In [64]:
client.close()

### There is more
**With Clause**

In [65]:
with InfluxDBClient(url="http://localhost:8086", token=token) as client:
    with client.write_api(WriteOptions(SYNCHRONOUS,
                     batch_size=500,
                     max_retries=5_000)) as writer:
        
        writer.write(bucket=bucket,
                        org=org,
                        record=df,
                        write_precision='ns',
                        data_frame_measurement_name='acc',
                        data_frame_tag_columns=[])

# Recipe 4: Storing Data in Snowflake

To connect to Snowflake, you will need to install the Snowflake Python connector.

* To install using `Conda`, run the following:

```
conda install -c conda-forge snowflake-sqlalchemy snowflake-connector-python
```
* To install using `pip`, run the following:

```
pip install "snowflake-connector-python[pandas]"
pip install --upgrade snowflake-sqlalchemy
```

### Using `pandas.to_sql`

In [66]:
import pandas as pd
from snowflake.connector.pandas_tools import pd_writer
from configparser import ConfigParser
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

In [67]:
# uncomment if you need to recreate the DataFrame
# path = Path('../../datasets/Ch5/ExtraSensory/')
# file = '0A986513-7828-4D53-AA1F-E02D6DF9561B.features_labels.csv.gz'
# columns = ['timestamp',
#            'watch_acceleration:magnitude_stats:mean']

# df = pd.read_csv(path.joinpath(file),
#                 usecols=columns)
# df = df.fillna(method='backfill')
# df.columns = ['timestamp','acc']
# df['timestamp'] = pd.to_datetime(df['timestamp'],
#                                   origin='unix',
#                                   unit='s',
#                                   utc=True)

In [68]:
df = df.reset_index()
df.columns = df.columns.str.upper()
df.head()

Unnamed: 0,TIMESTAMP,ACC
0,2015-12-08 19:06:37+00:00,995.369977
1,2015-12-08 19:07:37+00:00,995.369977
2,2015-12-08 19:08:37+00:00,995.369977
3,2015-12-08 19:09:37+00:00,996.406005
4,2015-12-08 19:10:55+00:00,1034.180063


In [69]:
# connector.paramstyle='qmark'
config = ConfigParser()
config.read('snow.cfg')
config.sections()
params = dict(config['SNOWFLAKE'])

In [70]:
url = URL(**params)
engine = create_engine(url)
# connection = engine.connect()

In [71]:
try:
    df.to_sql('sensor',
              engine,
              index=False,
              method=pd_writer,
              if_exists='replace')
except:
    print('failed to write')

In [72]:
query = 'SELECT * FROM SENSOR;'
try:
    snow_df = pd.read_sql(query, engine, index_col='timestamp')
    snow_df.info()
except:
    print('failed to query')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 3960 entries, 2015-12-08 19:06:37 to 2015-12-11 18:48:27
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   acc     3960 non-null   float64
dtypes: float64(1)
memory usage: 61.9 KB


In [73]:
snow_df.head()

Unnamed: 0_level_0,acc
timestamp,Unnamed: 1_level_1
2015-12-08 19:06:37,995.369977
2015-12-08 19:07:37,995.369977
2015-12-08 19:08:37,995.369977
2015-12-08 19:09:37,996.406005
2015-12-08 19:10:55,1034.180063


## There is more
### Using `write_pandas`

In [74]:
from snowflake import connector
from snowflake.connector.pandas_tools import pd_writer, write_pandas

con = connector.connect(**params)
cursor = con.cursor()

# delete records from the previous write
cursor.execute('DELETE FROM sensor;')

# uncomment if the dataframe has an index and lower case column names
# df = df.reset_index()
# df.columns = df.columns.str.upper()

success, nchunks, nrows, copy_into = write_pandas(con, df, table_name='SENSOR')

In [75]:
print('success: ', success)
print('number of chunks: ', nchunks)
print('number of rows: ', nrows)
print('COPY INTO output', copy_into)

success:  True
number of chunks:  1
number of rows:  3960
COPY INTO output [('zztdm/file0.txt', 'LOADED', 3960, 3960, 1, 0, None, None, None, None)]
