# Applied Project in Big Data on Industrial Dataset

## DATA COLLECTION TECHNIQUES
## Part V. Load from object storage and preprocessing

### 1. Libraries and credentials

[About boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) library.

In [None]:
import os
import sys
import json
import boto3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pd.set_option('display.max_columns', None)

In [None]:
def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data

creds = access_data(file_path='access_bucket.json')
print(creds.keys())

### 2. Session and client for loading

In [None]:
session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    aws_access_key_id=creds['aws_access_key_id'],
    aws_secret_access_key=creds['aws_secret_access_key'],
    endpoint_url='https://storage.yandexcloud.net'
)

In [None]:
VK_DATA_BUCKET = creds['bucket']

In [None]:
all_files = [key['Key'] for key in s3.list_objects(Bucket=VK_DATA_BUCKET)['Contents']]
print('files in storage:', all_files[:10]) # works only for num of files < 1000

### 3. Load data from the storage

In [None]:
file_to_load = all_files[1]
print('file to load:', file_to_load)
get_object_response = s3.get_object(
    Bucket=VK_DATA_BUCKET, 
    Key=file_to_load
)

In [None]:
get_object_response

In [None]:
data = json.load(get_object_response['Body'])
type(data)

In [None]:
data[0]

In [None]:
df = pd.DataFrame(data)
df.head()

### 4. Time for Big Data (Spark)

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, count_distinct, from_unixtime, col

In [None]:
conf = SparkConf()
conf.set('spark.master', 'local[5]')
conf.set('spark.driver.memory', '12G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', creds['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', creds['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')
spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 
                                     'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')

In [None]:
all_files[:10]

In [None]:
all_files[3]

In [None]:
file_path = f's3a://{VK_DATA_BUCKET}/' + all_files[3]
sdf = spark.read.json(file_path)

In [None]:
sdf.show()

In [None]:
sdf = sdf.select(F.explode(sdf.items))

In [None]:
sdf.show()

In [None]:
def flat_df(df, prefix=None):
    flat_cols = [c[0] for c in df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in df.dtypes if c[1][:6] == 'struct']
    flat_df = df.select(
        flat_cols + 
        [F.col(ncol + '.' + col).alias(prefix + col if prefix else ncol + '_' + col ) 
         for ncol in nested_cols 
         for col in df.select(ncol + '.*').columns]
    )
    return flat_df

In [None]:
sdf = flat_df(sdf, prefix='')

In [None]:
sdf.show(3)

In [None]:
sdf.limit(5).toPandas()

In [None]:
sdf.count()

In [None]:
# We can load ALL files with posts from ALL walls and users
file_path = f's3a://{VK_DATA_BUCKET}/gsom_ma-2023-11-20-01-00-05-925736/walls/wall_owner_id_*.json'
sdf = spark.read.json(file_path)

In [None]:
sdf.count()

In [None]:
sdf.limit(5).toPandas()

In [None]:
sdf = sdf.select(F.explode(sdf.groups))
sdf = flat_df(sdf, prefix='')

In [None]:
sdf.limit(5).toPandas()

### 5. Data processing and basic EDA - VK

In [None]:
file_path = [
    f's3a://{VK_DATA_BUCKET}/gsom_abiturient-2023-11-14-*/wall_owner_id_*.json',
    f's3a://{VK_DATA_BUCKET}/gsom_ma-2023-11-20-*/wall_owner_id_*.json',
    f's3a://{VK_DATA_BUCKET}/gsom_spbu-2023-11-15-*//wall_owner_id_*.json'
]
sdf = spark.read.json(file_path)

In [None]:
sdf.show()

#### Creating dataset v1

In [None]:
sdf1 = sdf.select(F.explode(sdf.groups), sdf.items)
sdf1 = flat_df(sdf1, prefix='')
sdf1.limit(5).toPandas()

In [None]:
sdf1 = sdf1.select(sdf1.col_id, sdf1.col_name, F.explode(sdf.items))
sdf1 = flat_df(sdf1, prefix='')
sdf1.limit(5).toPandas()

In [None]:
sdf1.groupBy("col_name").count().show()

In [None]:
sdf1.count()

#### Creating dataset v2

In [None]:
sdf_groups = sdf.select(F.explode(sdf.groups))
sdf_groups = flat_df(sdf_groups, prefix='')

In [None]:
sdf_groups.limit(5).toPandas()

In [None]:
sdf_posts = sdf.select(F.explode(sdf.items))
sdf_posts = flat_df(sdf_posts, prefix='')

In [None]:
sdf_posts.limit(5).toPandas()

In [None]:
sdf_posts = sdf_posts.withColumn('col_owner_id', F.regexp_replace('col_owner_id', '-', ''))
sdf_posts = sdf_posts.withColumn('col_from_id', F.regexp_replace('col_from_id', '-', ''))

In [None]:
sdf_posts.limit(5).toPandas()

In [None]:
sdf_posts = sdf_posts.join(sdf_groups, sdf_posts.col_owner_id == sdf_groups.col_id)

In [None]:
sdf_posts.limit(5).toPandas()

In [None]:
sdf_posts.count()

In [None]:
sdf_posts.groupBy("col_name").count().show()

In [None]:
sdf_posts.select('col_name').distinct().collect()

In [None]:
sdf_posts.filter(sdf_posts.col_name == 'Высшая школа менеджмента СПбГУ').count()

In [None]:
sdf_posts \
    .filter(sdf_posts.col_name == 'Высшая школа менеджмента СПбГУ') \
    .limit(5) \
    .toPandas()

In [None]:
sdf = sdf_posts.select(
    sdf_posts.col_date,
    sdf_posts.col_comments,
    sdf_posts.col_likes,
    sdf_posts.col_reposts,
    sdf_posts.col_views,
    sdf_posts.col_text,
    sdf_posts.col_name,
    sdf_posts.col_screen_name
).dropDuplicates()
sdf.limit(5).toPandas()

In [None]:
sdf.count()

In [None]:
sdf.groupBy("col_name").count().show()

### 6. Data processing and basic EDA - AI Jobs

In [None]:
file_path = './ai_jobs_data/*.json'
sdf = spark.read.json(file_path)

In [None]:
sdf.printSchema()

In [None]:
sdf.show()

In [None]:
sdf.describe()

In [None]:
sdf.limit(5).toPandas()

In [None]:
df = sdf.toPandas()

In [None]:
df.describe()

In [None]:
df.employmentType.hist()
plt.show()

In [None]:
df.baseSalary

In [None]:
df.baseSalary[0]

#### 6.1.What is the target?

In [None]:
sdf = flat_df(sdf, prefix='')
sdf.limit(2).toPandas()

In [None]:
# once againg to unpack all inner structures
sdf = flat_df(sdf, prefix='')
sdf.limit(2).toPandas()

In [None]:
split_col = F.split(sdf['salary_range'], ' ')

sdf = sdf.withColumn('currency', split_col.getItem(0))
sdf = sdf.withColumn('salary_value_min', split_col.getItem(1))
sdf = sdf.withColumn('salary_value_max', split_col.getItem(3))

sdf = sdf.withColumn('salary_value_min', 
                     F.regexp_replace('salary_value_min', 'K', ''))
sdf = sdf.withColumn('salary_value_min', col('salary_value_min') * 1000)
sdf = sdf.withColumn('salary_value_max', 
                     F.regexp_replace('salary_value_max', 'K', ''))
sdf = sdf.withColumn('salary_value_max', col('salary_value_max') * 1000)

In [None]:
sdf.limit(2).toPandas()

#### 6.2. Dataset for modelling

In [None]:
sdf_ds = sdf.select(
    sdf.currency.alias('cur_1'),
    sdf.salary_value_min.alias('salary_min_1'),
    sdf.salary_value_max.alias('salary_max_1'),
    sdf.baseSalary_currency.alias('cur_2'),
    sdf.baseSalary_value_maxValue.alias('salary_min_2'),
    sdf.baseSalary_value_minValue.alias('salary_max_2'),
    sdf.baseSalary_value_unitText.alias('salary_period'),
    sdf.benefits.alias('benefits'),
    sdf.company,
    sdf['hiringOrganization_@type'].alias('company_type'),
    sdf.description,
    sdf.employmentType.alias('type'),
    sdf.level,
    sdf.jobLocation_address_addressCountry.alias('country'),
    sdf.location,
    sdf.position,
    sdf.skills,
).dropDuplicates()
sdf_ds.limit(5).toPandas()

In [None]:
df = sdf_ds.toPandas()
df.info()

#### 6.3. Some EDA

In [None]:
df.describe(include=['float64', 'object']).T

In [None]:
import seaborn as sns

In [None]:
plt.figure(figsize=(14, 10))
sns.violinplot(data=df, x='salary_min_1', y='type')
plt.show()

In [None]:
df.cur_1.hist()
plt.show()

In [None]:
df.type.hist()
plt.show()

In [None]:
plt.figure(figsize=(18, 8))
df.country.hist(bins=60)
plt.xticks(rotation='vertical')
plt.show()

### 7. Texts processing

In [None]:
os.listdir('articles_data')

In [None]:
file_path = './articles_data/*.json'
sdf = spark.read.json(file_path)

In [None]:
sdf.limit(2).toPandas()

In [None]:
sdf = sdf.withColumn('file', F.input_file_name())
sdf.limit(2).toPandas()

In [None]:
sdf = sdf.withColumn(
    'file', 
    F.regexp_replace(
        'file', 
        'file:///home/jovyan/apbdid_23/topic_2/articles_data/articles_lbl_', 
        '')
)
sdf = sdf.withColumn(
    'label', 
    F.regexp_replace(
        'file', 
        '.json', 
        '')
)
sdf.limit(2).toPandas()

In [None]:
sdf = sdf.select(
    sdf.label,
    F.explode(sdf.articles)
)
sdf.limit(5).toPandas()

In [None]:
sdf.printSchema()

In [None]:
sdf = flat_df(sdf, prefix='')
sdf.limit(5).toPandas()

In [None]:
sdf_ds = sdf.select(
    sdf.label,
    sdf.col_name.alias('title'),
    sdf.col_annotation.alias('annotation')
)
sdf_ds.limit(5).toPandas()

In [None]:
sdf_ds.groupBy('label').count().show()