# **Spark Streaming**

In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
 
# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
 
# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
 
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
 

# install findspark using pip
!pip install -q findspark

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 46.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=6e6e4c4be285b3180d3e391588eb190001b4f29fe53934f1deac05481f22fda9
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark import SQLContext

In [4]:
import numpy as np
import pandas as pd

cb_tr = np.load('/content/coinbase_trade_array.npy',allow_pickle=True)
bn_order = np.load('/content/binance_order_array.npy',allow_pickle=True)

In [5]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
sqlContext = SQLContext(sc)

In [6]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

In [7]:
cb_trade = ssc.sparkContext.parallelize (
    cb_tr
)

binance_order = ssc.sparkContext.parallelize (
    bn_order
)

In [8]:
merged = (binance_order.map(lambda x: (str(x['time'])[:-3],0)))\
.leftOuterJoin(
    cb_trade.map(lambda x: ((str(x[0])[:-2]), x[4]))
    )

In [9]:
merged = merged.map(lambda x: [x[0],x[1][1]])

In [10]:
merged.take(50)

[['1609195950', None],
 ['1609195951', None],
 ['1609195953', None],
 ['1609195954', None],
 ['1609195957', None],
 ['1609195962', None],
 ['1609195964', None],
 ['1609195967', None],
 ['1609195968', None],
 ['1609195970', None],
 ['1609195971', None],
 ['1609195976', None],
 ['1609195977', None],
 ['1609195982', None],
 ['1609195984', None],
 ['1609195987', None],
 ['1609195990', None],
 ['1609195992', None],
 ['1609195993', None],
 ['1609195995', None],
 ['1609195996', None],
 ['1609195998', None],
 ['1609196000', None],
 ['1609196001', None],
 ['1609196003', None],
 ['1609196008', None],
 ['1609196015', None],
 ['1609196017', None],
 ['1609196018', None],
 ['1609196020', None],
 ['1609196021', None],
 ['1609196023', None],
 ['1609196028', None],
 ['1609196032', None],
 ['1609196037', None],
 ['1609196040', 0.02707],
 ['1609196042', None],
 ['1609196046', None],
 ['1609196055', None],
 ['1609196058', None],
 ['1609196061', None],
 ['1609196063', None],
 ['1609196066', None],
 ['16091

In [11]:
def arrange(iterator):

    sv = 0

    out = []

    for x in iterator:
        if x[1] is None:
            x[1] = sv
        else:
            sv = x[1]

        out.append([x[0],x[1]])
        
    return out 

In [12]:
cb_arr = merged.mapPartitions(arrange)

In [13]:
cb_arr.take(20)

[['1609195950', 0],
 ['1609195951', 0],
 ['1609195953', 0],
 ['1609195954', 0],
 ['1609195957', 0],
 ['1609195962', 0],
 ['1609195964', 0],
 ['1609195967', 0],
 ['1609195968', 0],
 ['1609195970', 0],
 ['1609195971', 0],
 ['1609195976', 0],
 ['1609195977', 0],
 ['1609195982', 0],
 ['1609195984', 0],
 ['1609195987', 0],
 ['1609195990', 0],
 ['1609195992', 0],
 ['1609195993', 0],
 ['1609195995', 0]]

In [14]:
def arrange_last(iterator):

    sv = 0    
    out = []
    res = []

    for x in iterator:
        if x[1] is not 0:
            out.append(x[1])
    
    for x in iterator:

        if x[1] is 0:
            x[1] = out[0]

        res.append([x[0],x[1]])

    return res 

In [15]:
last_cb = cb_arr.mapPartitions(arrange_last)

In [16]:
last_cb.take(200)

[['1609195950', 0.02707],
 ['1609195951', 0.02707],
 ['1609195953', 0.02707],
 ['1609195954', 0.02707],
 ['1609195957', 0.02707],
 ['1609195962', 0.02707],
 ['1609195964', 0.02707],
 ['1609195967', 0.02707],
 ['1609195968', 0.02707],
 ['1609195970', 0.02707],
 ['1609195971', 0.02707],
 ['1609195976', 0.02707],
 ['1609195977', 0.02707],
 ['1609195982', 0.02707],
 ['1609195984', 0.02707],
 ['1609195987', 0.02707],
 ['1609195990', 0.02707],
 ['1609195992', 0.02707],
 ['1609195993', 0.02707],
 ['1609195995', 0.02707],
 ['1609195996', 0.02707],
 ['1609195998', 0.02707],
 ['1609196000', 0.02707],
 ['1609196001', 0.02707],
 ['1609196003', 0.02707],
 ['1609196008', 0.02707],
 ['1609196015', 0.02707],
 ['1609196017', 0.02707],
 ['1609196018', 0.02707],
 ['1609196020', 0.02707],
 ['1609196021', 0.02707],
 ['1609196023', 0.02707],
 ['1609196028', 0.02707],
 ['1609196032', 0.02707],
 ['1609196037', 0.02707],
 ['1609196040', 0.02707],
 ['1609196042', 0.02707],
 ['1609196046', 0.02707],
 ['160919605

In [17]:
def arranger(iterator):

    sv = 0    
    out = []
    res = []

    for i in range(np.array(iterator).shape[0] - 1):

        iterator[i][1] = iterator[i+1][1]

        out.append([iterator[i][0],iterator[i][1]])

    return out 

In [18]:
final_cb = last_cb.mapPartitions(arranger)

In [19]:
final_cb.take(100)

[['1609195950', 0.02707],
 ['1609195951', 0.02707],
 ['1609195953', 0.02707],
 ['1609195954', 0.02707],
 ['1609195957', 0.02707],
 ['1609195962', 0.02707],
 ['1609195964', 0.02707],
 ['1609195967', 0.02707],
 ['1609195968', 0.02707],
 ['1609195970', 0.02707],
 ['1609195971', 0.02707],
 ['1609195976', 0.02707],
 ['1609195977', 0.02707],
 ['1609195982', 0.02707],
 ['1609195984', 0.02707],
 ['1609195987', 0.02707],
 ['1609195990', 0.02707],
 ['1609195992', 0.02707],
 ['1609195993', 0.02707],
 ['1609195995', 0.02707],
 ['1609195996', 0.02707],
 ['1609195998', 0.02707],
 ['1609196000', 0.02707],
 ['1609196001', 0.02707],
 ['1609196003', 0.02707],
 ['1609196008', 0.02707],
 ['1609196015', 0.02707],
 ['1609196017', 0.02707],
 ['1609196018', 0.02707],
 ['1609196020', 0.02707],
 ['1609196021', 0.02707],
 ['1609196023', 0.02707],
 ['1609196028', 0.02707],
 ['1609196032', 0.02707],
 ['1609196037', 0.02707],
 ['1609196040', 0.02707],
 ['1609196042', 0.02707],
 ['1609196046', 0.02707],
 ['160919605

# **Inlux DB**

In [20]:
!pip install influxdb

Collecting influxdb
[?25l  Downloading https://files.pythonhosted.org/packages/01/95/3a72ea5e19df828d27af0a50092f2b24114a5f89922efb4d8a0960bf13ef/influxdb-5.3.1-py2.py3-none-any.whl (77kB)
[K     |████▏                           | 10kB 14.6MB/s eta 0:00:01[K     |████████▍                       | 20kB 20.6MB/s eta 0:00:01[K     |████████████▋                   | 30kB 12.8MB/s eta 0:00:01[K     |████████████████▉               | 40kB 11.1MB/s eta 0:00:01[K     |█████████████████████           | 51kB 5.5MB/s eta 0:00:01[K     |█████████████████████████▎      | 61kB 6.1MB/s eta 0:00:01[K     |█████████████████████████████▍  | 71kB 6.2MB/s eta 0:00:01[K     |████████████████████████████████| 81kB 4.1MB/s 
Installing collected packages: influxdb
Successfully installed influxdb-5.3.1


In [21]:
!sudo apt-get update && sudo apt-get install influxdb

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [41.5 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:14 http://ppa.launchpad.net/graphics-drivers/ppa

In [23]:
!sudo service influxdb start

 * Starting database influxd
   ...done.


In [24]:
from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086)

In [25]:
client.create_database('coinbase_trade')

In [26]:
client.get_list_database()

[{'name': '_internal'}, {'name': 'coinbase_trade'}]

In [31]:
def influx(partitions):
    result = []
    for i in range(np.array(partitions).shape[0] - 1):
        result.append('coinbase_trade date='+partitions[i][0]+','+'price='+ str(partitions[i][1]))
    return result

In [32]:
inf_final=final_cb.mapPartitions(influx)  

In [33]:
inf_final.take(2)

['coinbase_trade date=1609195950,price=0.02707',
 'coinbase_trade date=1609195951,price=0.02707']

In [34]:
client.write_points(inf_final.collect(), database='coinbase_trade', time_precision='ms', batch_size=1, protocol='line')

True

In [35]:
client.switch_database('coinbase_trade')

In [37]:
q='select * from coinbase_trade.autogen.coinbase_trade'
bin_trade = pd.DataFrame(client.query(q).get_points())

In [38]:
bin_trade

Unnamed: 0,time,date,price
0,2021-01-03T14:20:10.031Z,1.609196e+09,0.02707
1,2021-01-03T14:20:10.051Z,1.609196e+09,0.02707
2,2021-01-03T14:20:10.058Z,1.609196e+09,0.02707
3,2021-01-03T14:20:10.066Z,1.609196e+09,0.02707
4,2021-01-03T14:20:10.072Z,1.609196e+09,0.02707
...,...,...,...
10043,2021-01-03T14:21:18.076Z,1.609239e+09,0.02725
10044,2021-01-03T14:21:18.084Z,1.609239e+09,0.02725
10045,2021-01-03T14:21:18.092Z,1.609239e+09,0.02725
10046,2021-01-03T14:21:18.098Z,1.609239e+09,0.02725
