# 01 Identifying individuals, variables and categorical variables in a data set

In [12]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
from sklearn import preprocessing

In [13]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

import findspark

findspark.init()
from pyspark.context import SparkContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import functions as F
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("statistics").master("local").getOrCreate()

In [89]:
import json
import uuid
from confluent_kafka import Producer
from ksql import KSQLAPI
bootstrap_servers='[::1]:9092'
topic='test2'
msg_count=5
client = KSQLAPI(url='http://localhost:8088', timeout=60000)

In [90]:
p = Producer({'bootstrap.servers': bootstrap_servers})

In [91]:
for i in zip(*dataset.values()):
    x = dict(zip(dataset.keys(), i))
    print(x)
    record_key = str(uuid.uuid4())
    record_value = json.dumps(x)

    p.produce(topic, value=record_value)
    p.poll(0)
    
p.flush()    

{'MID': 1, 'Drink': 'Brewed coffee', 'Type': 'Hot', 'Calories': 4, 'Sugars mg': 0, 'Caffein': 260}
{'MID': 2, 'Drink': 'Caffe latte', 'Type': 'Hot', 'Calories': 100, 'Sugars mg': 14, 'Caffein': 75}
{'MID': 3, 'Drink': 'Caffe mocha', 'Type': 'Hot', 'Calories': 170, 'Sugars mg': 27, 'Caffein': 95}
{'MID': 4, 'Drink': 'Cappuccino', 'Type': 'Hot', 'Calories': 60, 'Sugars mg': 8, 'Caffein': 75}
{'MID': 5, 'Drink': 'Iced brewed coffee', 'Type': 'Cold', 'Calories': 60, 'Sugars mg': 15, 'Caffein': 120}
{'MID': 6, 'Drink': 'Chai latte', 'Type': 'Hot', 'Calories': 120, 'Sugars mg': 25, 'Caffein': 60}


0

In [100]:
client.create_table(table_name='a7a',
                     columns_type=['MID bigint', 'Drink varchar', 'Type varchar', 'Calories varchar', "`Sugars mg` varchar", 'Caffein varchar'],
                     topic='test2',
                     key="MID",
                     value_format='JSON'
                    )

KSQLError: ('Failed to prepare statement: Invalid config variable(s) in the WITH clause: KEY', 40001, None)

In [80]:
client.ksql('show tables')

[{'@type': 'tables',
  'statementText': 'show tables;',
  'tables': [],

In [96]:
res = client.query("select * from a7a where Type='Cold'")
#print(next(res))

In [74]:
def parse_results(res):
    res = ''.join(res)
    res = res.replace('\n', '')
    res = res.replace('}{', '},{')
    res = '[' + res + ']'
    return json.loads(res)

In [75]:
parse_results(res)

RuntimeError: generator raised StopIteration

In [97]:
while True:
    try:
        print(next(res))
    except RuntimeError:
        print('')
        break

[{"header":{"queryId":"transient_A7A_2184474873978409361","schema":"`MID` BIGINT, `DRINK` STRING, `TYPE` STRING, `CALORIES` STRING, `Sugars mg` STRING, `CAFFEIN` STRING"}},

{"row":{"columns":[5,"Iced brewed coffee","Cold","60","15","120"]}},

]




In [53]:
df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', bootstrap_servers) \
  .option("startingOffsets", "earliest") \
  .option('subscribe', topic) \
  .load()
df_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [54]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [65]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

In [67]:
schema = StructType([StructField('data', StringType())])
schema

StructType(List(StructField(data,StringType,true)))

In [84]:

al = df_json.select(from_json(df_json.json, schema).alias('raw_data')) \
  .select('raw_data.data') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start().stop()

In [None]:
al

In [86]:

df_json.select(from_json(df_json.json, schema).alias('raw_data')) \
  .select('raw_data.data') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

StreamingQueryException: org/apache/spark/sql/connector/read/streaming/ReportsSourceMetrics
=== Streaming Query ===
Identifier: [id = 83d233d4-27f8-4087-9e94-2db05723bde3, runId = dfae8902-d26d-4533-95eb-cb895448e5c5]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: INITIALIZING
Thread State: RUNNABLE

[khanacademy](https://www.khanacademy.org/math/ap-statistics/analyzing-categorical-ap/analyzing-one-categorical-variable/v/identifying-individuals-variables-and-categorical-variables-in-a-data-set?modal=1)

![Identifying individuals, variables and categorical variables in a data set fig 1](./imgs/01-01-01.png)![Identifying individuals, variables and categorical variables in a data set fig 2](./imgs/01-01-02.png)

In [88]:
dataset = {
    "MID": [1,2,3,4,5,6],
    "Drink": [
        "Brewed coffee",
        "Caffe latte",
        "Caffe mocha",
        "Cappuccino",
        "Iced brewed coffee",
        "Chai latte",
    ],
    "Type": ["Hot", "Hot", "Hot", "Hot", "Cold", "Hot"],
    "Calories": [4, 100, 170, 60, 60, 120],
    "Sugars mg": [0, 14, 27, 8, 15, 25],
    "Caffein": [260, 75, 95, 75, 120, 60],
}

In [6]:
df = pd.DataFrame(dataset).set_index("Drink")
df

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg)
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Brewed coffee,Hot,4,0,260
Caffe latte,Hot,100,14,75
Caffe mocha,Hot,170,27,95
Cappuccino,Hot,60,8,75
Iced brewed coffee,Cold,60,15,120
Chai latte,Hot,120,25,60


In [7]:
sdf = spark.createDataFrame(zip(*dataset.values()), schema=list(dataset.keys()))
sdf.show()

+------------------+----+--------+----------+------------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|
+------------------+----+--------+----------+------------+
|     Brewed coffee| Hot|       4|         0|         260|
|       Caffe latte| Hot|     100|        14|          75|
|       Caffe mocha| Hot|     170|        27|          95|
|        Cappuccino| Hot|      60|         8|          75|
|Iced brewed coffee|Cold|      60|        15|         120|
|        Chai latte| Hot|     120|        25|          60|
+------------------+----+--------+----------+------------+



[('Drink', 'Brewed coffee', 'Hot', 4, 0, 260),
 ('Type', 'Caffe latte', 'Hot', 100, 14, 75),
 ('Calories', 'Caffe mocha', 'Hot', 170, 27, 95),
 ('Sugars (g)', 'Cappuccino', 'Hot', 60, 8, 75),
 ('Caffein (mg)', 'Iced brewed coffee', 'Cold', 60, 15, 120)]

In [15]:
producer.produce(json.dumps(dataset).encode('ascii'))

<pykafka.protocol.message.Message at 0x2dd6f80db30>

## Find and Replace

### Panads

In [6]:
df_fr = df.copy()
df_fr.replace({"Type": {"Hot": 1, "Cold": 0}}, inplace=True)
df_fr

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg)
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Brewed coffee,1,4,0,260
Caffe latte,1,100,14,75
Caffe mocha,1,170,27,95
Cappuccino,1,60,8,75
Iced brewed coffee,0,60,15,120
Chai latte,1,120,25,60


### Spark

In [7]:
sdf_fr = sdf.withColumn(
    "Type", F.when(F.col("Type") == "Hot", 1).when(F.col("Type") == "Cold", 0)
)
sdf_fr.show()

+------------------+----+--------+----------+------------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|
+------------------+----+--------+----------+------------+
|     Brewed coffee|   1|       4|         0|         260|
|       Caffe latte|   1|     100|        14|          75|
|       Caffe mocha|   1|     170|        27|          95|
|        Cappuccino|   1|      60|         8|          75|
|Iced brewed coffee|   0|      60|        15|         120|
|        Chai latte|   1|     120|        25|          60|
+------------------+----+--------+----------+------------+



## Label Encoding

### Pandas

In [8]:
df_lc = df.copy()
df_lc["Type_Hot"] = df_lc["Type"].astype("category").cat.codes
df_lc

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg),Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,Hot,4,0,260,1
Caffe latte,Hot,100,14,75,1
Caffe mocha,Hot,170,27,95,1
Cappuccino,Hot,60,8,75,1
Iced brewed coffee,Cold,60,15,120,0
Chai latte,Hot,120,25,60,1


### Sklearn

In [9]:
df_lc = df.copy()
df_lc["Type_Hot"] = preprocessing.LabelEncoder().fit_transform(df["Type"])
df_lc

Unnamed: 0_level_0,Type,Calories,Sugars (g),Caffein (mg),Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,Hot,4,0,260,1
Caffe latte,Hot,100,14,75,1
Caffe mocha,Hot,170,27,95,1
Cappuccino,Hot,60,8,75,1
Iced brewed coffee,Cold,60,15,120,0
Chai latte,Hot,120,25,60,1


### Spark

In [10]:
sdf_lc = StringIndexer(inputCol="Type", outputCol="Type_Cold").fit(sdf).transform(sdf)
sdf_lc.show()

+------------------+----+--------+----------+------------+---------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|Type_Cold|
+------------------+----+--------+----------+------------+---------+
|     Brewed coffee| Hot|       4|         0|         260|      0.0|
|       Caffe latte| Hot|     100|        14|          75|      0.0|
|       Caffe mocha| Hot|     170|        27|          95|      0.0|
|        Cappuccino| Hot|      60|         8|          75|      0.0|
|Iced brewed coffee|Cold|      60|        15|         120|      1.0|
|        Chai latte| Hot|     120|        25|          60|      0.0|
+------------------+----+--------+----------+------------+---------+



## One-Hot Encoding

### Pandas

In [11]:
df_ohc = pd.get_dummies(df, columns=["Type"], prefix="Type")
df_ohc

Unnamed: 0_level_0,Calories,Sugars (g),Caffein (mg),Type_Cold,Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,4,0,260,0,1
Caffe latte,100,14,75,0,1
Caffe mocha,170,27,95,0,1
Cappuccino,60,8,75,0,1
Iced brewed coffee,60,15,120,1,0
Chai latte,120,25,60,0,1


### Sklearn

In [12]:
x = (
    preprocessing.OneHotEncoder()
    .fit_transform(df["Type"].values.reshape(-1, 1))
    .toarray()
)
df_ohc = pd.concat(
    [
        df.drop(columns="Type").reset_index(),
        pd.DataFrame(x, columns=["Type_Cold", "Type_Hot"]),
    ],
    axis=1,
)
df_ohc.set_index("Drink")

Unnamed: 0_level_0,Calories,Sugars (g),Caffein (mg),Type_Cold,Type_Hot
Drink,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Brewed coffee,4,0,260,0.0,1.0
Caffe latte,100,14,75,0.0,1.0
Caffe mocha,170,27,95,0.0,1.0
Cappuccino,60,8,75,0.0,1.0
Iced brewed coffee,60,15,120,1.0,0.0
Chai latte,120,25,60,0.0,1.0


### Spark

In [13]:
sdf_ohc = (
    OneHotEncoder(inputCol="Type_Cold", outputCol="Type_Vec")
    .fit(sdf_lc)
    .transform(sdf_lc)
)
sdf_ohc.show()

+------------------+----+--------+----------+------------+---------+-------------+
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|Type_Cold|     Type_Vec|
+------------------+----+--------+----------+------------+---------+-------------+
|     Brewed coffee| Hot|       4|         0|         260|      0.0|(1,[0],[1.0])|
|       Caffe latte| Hot|     100|        14|          75|      0.0|(1,[0],[1.0])|
|       Caffe mocha| Hot|     170|        27|          95|      0.0|(1,[0],[1.0])|
|        Cappuccino| Hot|      60|         8|          75|      0.0|(1,[0],[1.0])|
|Iced brewed coffee|Cold|      60|        15|         120|      1.0|    (1,[],[])|
|        Chai latte| Hot|     120|        25|          60|      0.0|(1,[0],[1.0])|
+------------------+----+--------+----------+------------+---------+-------------+

