In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
import pandas as pd

# 1) Test with minio and spark

### 1.1 Configure spark session

In [4]:
conf = SparkConf().setAll([
    ("spark.driver.host", "research"),
    ("spark.driver.port", "7001"), 
    ("spark.fileserver.port", "7002"), 
    ("spark.broadcast.port", "7003") , 
    ("spark.replClassServer.port", "7004") ,
    ("spark.blockManager.port", "7005"),
    ("spark.submit.deployMode",'client'),
    ('spark.executor.memory', '2g'), 
    ('spark.executor.cores', '2'), 
    ('spark.cores.max', '2'), 
    ('spark.driver.memory','8g')
    
]).setMaster("spark://master:7077")

sc = SparkContext( conf=conf)
spark = SparkSession(sc).builder \
        .appName("test_app") \
        .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000")

### 1.2 Create test pandas dataframe

In [5]:
df_local = pd.DataFrame([(1,'a'),(2,'v'),(3,'c'),(4,'a'),(21,'v')], columns = ['a','b'])
df_local 

Unnamed: 0,a,b
0,1,a
1,2,v
2,3,c
3,4,a
4,21,v


In [6]:
spark.createDataFrame(df_local).show(1)

+---+---+
|  a|  b|
+---+---+
|  1|  a|
+---+---+
only showing top 1 row



### 1.3 Create pyspark dataframe and write it to minio s3

In [7]:
spark.createDataFrame(df_local).repartition(1)\
.write\
.mode('overwrite')\
.option("header","true")\
.csv("s3a://test/tmp")

### 1.4 Read this dataframe from s3 and create temporary view

In [8]:
df = spark.read.csv("s3a://test/tmp/*.csv", header = True)
df.createOrReplaceTempView('tmp_table')

### 1.5 Make new spark datafrme with aggregation 

In [9]:
df_agg = spark\
.sql("""
SELECT 
b,
avg(a) as a_mean
FROM tmp_table 
GROUP BY b
""")
df_agg.show(5)

+---+------+
|  b|a_mean|
+---+------+
|  v|  11.5|
|  c|   3.0|
|  a|   2.5|
+---+------+



### 1.6 Write aggregated df back to s3

In [10]:
df_agg.repartition(1)\
.write\
.mode('overwrite')\
.option("header","true")\
.csv("s3a://test/tmp_agg")

# 2) minio S3 by boto3

In [11]:
import boto3
s3 = boto3.resource(
    's3', 
    endpoint_url='http://minio:9000', 
    aws_access_key_id = 'minio', 
    aws_secret_access_key = 'minio123'
)
bucket = s3.Bucket('test')
[obj for obj in bucket.objects.all()]

[s3.ObjectSummary(bucket_name='test', key='tmp/_SUCCESS'),
 s3.ObjectSummary(bucket_name='test', key='tmp/part-00000-6103a198-4f42-4318-9158-279ea07134f0-c000.csv'),
 s3.ObjectSummary(bucket_name='test', key='tmp_agg/_SUCCESS'),
 s3.ObjectSummary(bucket_name='test', key='tmp_agg/part-00000-67ed529f-57b1-4ab8-8120-2a8a53814069-c000.csv')]