# Preparing the enviroment

We will be using environment variables for Portia's information (keys, URLs, etc.) and some Spark/pyspark directives.

Please refer to [this link](https://ipython.readthedocs.io/en/stable/interactive/magics.html#magic-env) for more information about Jupyter Magics for environment variables.

```bash
%env PYSPARK_PYTHON        = python3.6
%env PYSPARK_DRIVER_PYTHON = python3.6
```

Alternatively, you may handle the environment variables programmatically via python itself

```python
import os 
os.environ["PYSPARK_PYTHON"]        = "python3.6"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3.6"
```

Here we will be using [python-dotenv](https://github.com/theskumar/python-dotenv). Feel free to also [source](https://bash.cyberciti.biz/guide/Source_command) the file in bash (the "source" command within the file is ignored by python-dotenv)


In [1]:
# loading .env  
import dotenv  
import pathlib 
dotenv.load_dotenv(dotenv_path=pathlib.Path('../.env'))

True

## Setting up PySpark

In [2]:
import os 
from pyspark import SparkConf, SparkContext

In [3]:
conf  = SparkConf().setMaster(os.getenv("SPARK_MASTER_URL")).setAppName("portia-integration")
spark = SparkContext(conf = conf)

## Setting up Portia

In [12]:
from portiapy.portia  import PortiaApi
from portiapy.profile import ProfileStrategies
from portiapy.summary import SummaryStrategies

import json

portia = PortiaApi({
    'baseurl': os.getenv("PORTIA_BASEURL") ,
    'authorization': os.getenv("PORTIA_KEY"), 
    'debug': bool(os.getenv("PORTIA_DEBUG") == 'True')
})

### Params

In [13]:
summary_params = {
#    'from': grafana_from_timestamp,
#    'to': grafana_to_timestamp,
    'order': '-1',
    #'precision': 'ms',
    'min': True,
    'max': True,
    'sum': False,
    'avg': False,
    'median': False,
    'mode'  : False,
    'stddev': False,
    'spread': False,
    'fill': 'null'
}

select_params = {
#     'from': None,
#     'to': None,
    'order': None,
    'precision': 'ms',
#     'limit': 1
}

In [14]:
edgeids   = os.getenv("EDGEIDS").split(',')

In [15]:
ports_df = portia.device(edgeids[0]).ports(last=True)
ports_df.humanize(datetime=True)

[portia-debug]: status: 200 | 0.0703 sec. | http://edge-portia-api.sa-east-1.elasticbeanstalk.com/v3/describe/device/KytTDwUp-j8yrsh8h/ports/last?precision=ms&sort=true
[portia-debug]: [{'header_timestamp': 1550086945535, 'dimension_thing_code': 1, 'port': '1'}, {'header_timestamp': 1550086925411, 'dimension_thing_code': 1, 'port': '2'}, {'header_timestamp': 1550086926807, 'dimension_thing_code': 2, 'port': '3'}]


Unnamed: 0,header_datetime,port,dimension_thing
0,seconds ago,1,SondaTU_v1
1,seconds ago,2,SondaTU_v1
2,seconds ago,3,SondaAirQ_v1


In [16]:
summary_df = portia.device(edgeids[0]).port(1).sensor(-1).summary(strategy=SummaryStrategies.PER_MONTH, interval=1, params=summary_params)
summary_df.head(10)

[portia-debug]: status: 400 | 0.0480 sec. | http://edge-portia-api.sa-east-1.elasticbeanstalk.com/v3/summary/device/KytTDwUp-j8yrsh8h/port/1/sensor/-1/permonth/1?order=-1&min=true&max=true&sum=false&avg=false&median=false&mode=false&stddev=false&spread=false&fill=null


Exception: couldn't retrieve data

In [14]:
df = portia.device(edgeids[0]).port(1).sensor(1).select(params=select_params, last=True)
df.head(3)

Unnamed: 0,header_timestamp,dimension_value,dimension_code,dimension_unity_code,dimension_thing_code,Unnamed: 5
0,1549037055689,35.7,1,1,1,


In [15]:
df = portia.device(edgeids[0]).port(1).sensor(1).select(params=select_params)



In [16]:
df = portia.device(edgeids[0]).port(1).sensor(1).select(params=select_params)
df.head(5)

Unnamed: 0,header_timestamp,dimension_value,dimension_code,dimension_unity_code,dimension_thing_code,Unnamed: 5
0,1549037055689,35.7,1,1,1,
1,1549036995634,35.7,1,1,1,
2,1549036935453,35.7,1,1,1,
3,1549036875524,35.7,1,1,1,
4,1549036334898,35.6,1,1,1,


In [12]:
df = portia.device(edgeids[0]).port(1).sensor(1).select(params=select_params)

print(df.plot(x='header_timestamp', y='dimension_value'))
df.shape

AxesSubplot(0.125,0.11;0.775x0.77)


(21622, 6)

## Integrating Portia with Spark RDDs

In [17]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
spark_df = sqlContext.createDataFrame(df)
spark_df.head(5)

[Row(header_timestamp=1549037055689, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1, Unnamed: 5=nan),
 Row(header_timestamp=1549036995634, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1, Unnamed: 5=nan),
 Row(header_timestamp=1549036935453, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1, Unnamed: 5=nan),
 Row(header_timestamp=1549036875524, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1, Unnamed: 5=nan),
 Row(header_timestamp=1549036334898, dimension_value=35.6, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1, Unnamed: 5=nan)]

In [18]:
spark_df.printSchema()

root
 |-- header_timestamp: long (nullable = true)
 |-- dimension_value: double (nullable = true)
 |-- dimension_code: long (nullable = true)
 |-- dimension_unity_code: long (nullable = true)
 |-- dimension_thing_code: long (nullable = true)
 |-- Unnamed: 5: double (nullable = true)



In [22]:
spark_df = spark_df.drop("Unnamed: 5")
spark_df.printSchema()

root
 |-- header_timestamp: long (nullable = true)
 |-- dimension_value: double (nullable = true)
 |-- dimension_code: long (nullable = true)
 |-- dimension_unity_code: long (nullable = true)
 |-- dimension_thing_code: long (nullable = true)



In [25]:
%%time
spark_df.head(5)


CPU times: user 5.13 ms, sys: 3.94 ms, total: 9.07 ms
Wall time: 470 ms


[Row(header_timestamp=1549037055689, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1),
 Row(header_timestamp=1549036995634, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1),
 Row(header_timestamp=1549036935453, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1),
 Row(header_timestamp=1549036875524, dimension_value=35.7, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1),
 Row(header_timestamp=1549036334898, dimension_value=35.6, dimension_code=1, dimension_unity_code=1, dimension_thing_code=1)]

In [26]:
%%time
spark_df.select("dimension_value","header_timestamp").orderBy("header_timestamp", ascending=False).show()

+---------------+----------------+
|dimension_value|header_timestamp|
+---------------+----------------+
|           35.7|   1549037055689|
|           35.7|   1549036995634|
|           35.7|   1549036935453|
|           35.7|   1549036875524|
|           35.6|   1549036334898|
|           35.3|   1549036034664|
|           35.3|   1549035914418|
|           35.3|   1549035794266|
|           35.3|   1549035433088|
|           35.3|   1549035373099|
|           35.3|   1549035313005|
|           35.2|   1549035132778|
|           35.3|   1549035072814|
|           35.2|   1549035012335|
|           35.3|   1549034832122|
|           35.3|   1549034711994|
|           35.3|   1549034591888|
|           35.3|   1549034531761|
|           34.9|   1549034171327|
|           34.9|   1549034111166|
+---------------+----------------+
only showing top 20 rows

CPU times: user 21.9 ms, sys: 1.62 ms, total: 23.5 ms
Wall time: 464 ms


In [27]:
%%time
from operator import add
spark_df.select("dimension_value").rdd.map(lambda a: (1, a['dimension_value'])).reduceByKey(lambda a,b: a + b).collect()

CPU times: user 85.3 ms, sys: 32.5 ms, total: 118 ms
Wall time: 2.26 s


In [63]:
spark_df.select("dimension_value").rdd

MapPartitionsRDD[255] at javaToPython at <unknown>:0

In [28]:
spark_df.count()

21622