# Data Engineer

## Import Libraries

In [104]:
import pandas as pd
import numpy as np
from elasticsearch import Elasticsearch, helpers 

## Load Dataset

In [105]:
# load dataset and limit
df = pd.read_csv("retail.csv", encoding='ISO-8859-1')

# Randomly sample 10,000 rows
df = df.sample(n=50000, random_state=43) 
df.reset_index(drop=True, inplace=True)

In [106]:
# Show dataset
df

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,563238,22679,FRENCH BLUE METAL DOOR SIGN 4,10,8/15/2011 9:59,1.25,15093.0,United Kingdom
1,553566,22699,ROSES REGENCY TEACUP AND SAUCER,12,5/18/2011 9:09,2.95,12690.0,France
2,546084,22303,COFFEE MUG APPLES DESIGN,6,3/9/2011 11:28,2.55,14112.0,United Kingdom
3,572302,23533,WALL ART GARDEN HAVEN,1,10/23/2011 14:47,5.95,15427.0,United Kingdom
4,558614,22993,SET OF 4 PANTRY JELLY MOULDS,3,6/30/2011 15:56,2.46,,United Kingdom
...,...,...,...,...,...,...,...,...
49995,580848,72800E,4 IVORY DINNER CANDLES SILVER FLOCK,1,12/6/2011 11:51,0.79,18005.0,United Kingdom
49996,567742,23301,GARDENERS KNEELING PAD KEEP CALM,12,9/22/2011 10:47,1.65,14261.0,United Kingdom
49997,547387,22699,ROSES REGENCY TEACUP AND SAUCER,6,3/22/2011 16:00,2.95,12539.0,Spain
49998,563037,22352,LUNCH BOX WITH CUTLERY RETROSPOT,6,8/11/2011 15:02,2.55,12362.0,Belgium


In [107]:
# Check Data Info
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 8 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   InvoiceNo    50000 non-null  object 
 1   StockCode    50000 non-null  object 
 2   Description  49884 non-null  object 
 3   Quantity     50000 non-null  int64  
 4   InvoiceDate  50000 non-null  object 
 5   UnitPrice    50000 non-null  float64
 6   CustomerID   37337 non-null  float64
 7   Country      50000 non-null  object 
dtypes: float64(2), int64(1), object(5)
memory usage: 3.1+ MB


In [108]:
# Create New Column `Total Sales`
df['TotalSales'] = df['Quantity'] * df['UnitPrice']

## Check Missing Value

In [109]:
# Menghitung jumlah nilai null di setiap kolom
null_counts = df.isnull().sum()

# Menghitung persentase nilai null
null_percentages = (null_counts / len(df)) * 100


null_percentages


InvoiceNo       0.000
StockCode       0.000
Description     0.232
Quantity        0.000
InvoiceDate     0.000
UnitPrice       0.000
CustomerID     25.326
Country         0.000
TotalSales      0.000
dtype: float64

## Handling Missing Value

In [110]:
df = df.dropna(subset=['Description','CustomerID'])

In [111]:
df.isnull().sum()

InvoiceNo      0
StockCode      0
Description    0
Quantity       0
InvoiceDate    0
UnitPrice      0
CustomerID     0
Country        0
TotalSales     0
dtype: int64

## Change Data type Columns

In [112]:
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], format='%m/%d/%Y %H:%M', errors='coerce')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], format='%m/%d/%Y %H:%M', errors='coerce')


In [113]:
df['CustomerID'] = df['CustomerID'].astype('int64')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['CustomerID'] = df['CustomerID'].astype('int64')


In [115]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 37337 entries, 0 to 49999
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   InvoiceNo    37337 non-null  object        
 1   StockCode    37337 non-null  object        
 2   Description  37337 non-null  object        
 3   Quantity     37337 non-null  int64         
 4   InvoiceDate  37337 non-null  datetime64[ns]
 5   UnitPrice    37337 non-null  float64       
 6   CustomerID   37337 non-null  int64         
 7   Country      37337 non-null  object        
 8   TotalSales   37337 non-null  float64       
dtypes: datetime64[ns](1), float64(2), int64(2), object(4)
memory usage: 2.8+ MB


## Save Dataset to CSV File

In [117]:
df.to_csv('retail_cleaned.csv', index=False)

## Connecting to Elasticsearch

In [118]:
# Koneksi ke Elasticsearch
es = Elasticsearch('http://localhost:9200')

for i,r in df.iterrows():
    doc=r.to_dict()
    res=es.index(index="retail", doc_type="doc", body=doc)
    print(res)



{'_index': 'retail', '_type': 'doc', '_id': 'L7EhtJIBmJQq4-Ve5Iii', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 9983, '_primary_term': 1}
{'_index': 'retail', '_type': 'doc', '_id': 'MLEhtJIBmJQq4-Ve5Ij7', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 9984, '_primary_term': 1}
{'_index': 'retail', '_type': 'doc', '_id': 'MbEhtJIBmJQq4-Ve5Yg4', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 9985, '_primary_term': 1}
{'_index': 'retail', '_type': 'doc', '_id': 'MrEhtJIBmJQq4-Ve5Yhq', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 9986, '_primary_term': 1}
{'_index': 'retail', '_type': 'doc', '_id': 'M7EhtJIBmJQq4-Ve5Yic', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 9987, '_primary_term': 1}
{'_index': 'retail',