# Read files from S3 with Spark demo

## Libraries

In [None]:
import os
import json
import boto3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import udf
from pyspark.sql.types import *
from pyspark.sql.functions import struct
from pyspark.sql.functions import countDistinct
import multiprocessing
N_CORES = min(
    multiprocessing.cpu_count(), 
    int(float(os.environ['CPU_LIMIT']))
)
print('max cores available:', N_CORES)

## Credentials and set Spark

In [None]:
BUCKET = 'miba-projects-21-22-sites'
with open('access.json') as file:
    access_data = json.load(file)

In [None]:
print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

conf = SparkConf()
conf.set('spark.master', 'local[*]')
conf.set('spark.executor.memory', '16G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

In [None]:
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_data['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', access_data['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')

## List files with `boto3`

In [None]:
session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    aws_access_key_id=access_data['aws_access_key_id'],
    aws_secret_access_key=access_data['aws_secret_access_key'],
    endpoint_url='http://storage.yandexcloud.net'
)

In [None]:
all_files = [key['Key'] for key in s3.list_objects(Bucket=BUCKET)['Contents']]
print('first files:', all_files[:10])

## Read a file

In [None]:
file_path = f's3a://{BUCKET}/{all_files[0]}'
print('file path to load:', file_path)
sdf = spark.read.csv(
    file_path, 
    sep=',', 
    header=False
)

In [None]:
sdf.printSchema()

In [None]:
sdf.limit(5).toPandas()

In [None]:
%%time
sdf.count()