# Test access to minio buckets with spark

Remember to start spark with the little star (invisible) icon

This test comes from https://github.com/marcoverl/training/blob/master/CCR/S3-from-Spark-with-oidc.ipynb

In [2]:
# Get credentials to setup minio S3 access
#!pip install liboidcagent requests xmltodict pandas boto3

!eval `oidc-keychain` > /dev/null && oidc-token dodas --time=3600 > /tmp/token
with open('/tmp/token') as f:
    token = f.readlines()[0].split("\n")[0]

import requests
import xmltodict
r = requests.post("https://minio.cloud.infn.it",
                  data={
                      'Action':
                      "AssumeRoleWithWebIdentity",
                      'Version': "2011-06-15",
                      'WebIdentityToken': token,
                      'DurationSeconds': 9000
                  },
                  verify=True)

tree = xmltodict.parse(r.content)

credentials = dict(tree['AssumeRoleWithWebIdentityResponse']
                    ['AssumeRoleWithWebIdentityResult']['Credentials'])

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

conf = (SparkConf()
         .setMaster("k8s://https://kubernetes:443")
         .setAppName("MyApp")
         .set("spark.executor.memory", "1g")
         .set("spark.executor.instances", "2")
         .set("spark.kubernetes.container.image", "dodasts/spark:v3.0.1")
# configure S3 access  
         .set("spark.hadoop.fs.s3a.endpoint", "https://minio.cloud.infn.it")
         .set("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
         .set("spark.hadoop.fs.s3a.access.key", credentials["AccessKeyId"])
         .set("spark.hadoop.fs.s3a.secret.key", credentials["SecretAccessKey"]) 
         .set("spark.hadoop.fs.s3a.session.token", credentials["SessionToken"])
         .set("spark.hadoop.fs.s3a.path.style.access","true")
         .set("spark.hadoop.fs.s3a.fast.upload", "true")
         .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .set("spark.hadoop.fs.s3a.committer.name", "directory")
       )

#first stop spark context
SparkContext.stop(sc)

#Then create first spark context, and then session
sc = SparkContext(conf = conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
#spark = SparkSession(sc) # this would also work
sc

In [4]:
#check config
#spark.sparkContext._conf.getAll()

In [5]:
# Check spark with local read

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

inputFile = '../test_MLCourse/Higgs100k.csv'

#into panda DF
%time dataset = pd.read_csv(inputFile)

#then into spark dataframe...remember to start spark with the little icon
%time df = spark.createDataFrame(dataset)
%time total_events = df.count()

print('There are '+str(total_events)+' events')

CPU times: user 1.34 s, sys: 135 ms, total: 1.48 s
Wall time: 1.49 s
CPU times: user 28.5 s, sys: 256 ms, total: 28.7 s
Wall time: 32 s
CPU times: user 8.77 ms, sys: 16 ms, total: 24.8 ms
Wall time: 8.51 s
There are 99999 events


In [6]:
# read the CSV from minio with some options         
csv_df = spark.read.options(header='True',inferSchema='True').csv("s3a://scratch/verlato/dpc-covid19-ita-regioni.csv")
csv_df.printSchema()
csv_df.toPandas()

root
 |-- data: timestamp (nullable = true)
 |-- stato: string (nullable = true)
 |-- codice_regione: integer (nullable = true)
 |-- denominazione_regione: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- ricoverati_con_sintomi: integer (nullable = true)
 |-- terapia_intensiva: integer (nullable = true)
 |-- totale_ospedalizzati: integer (nullable = true)
 |-- isolamento_domiciliare: integer (nullable = true)
 |-- totale_positivi: integer (nullable = true)
 |-- variazione_totale_positivi: integer (nullable = true)
 |-- nuovi_positivi: integer (nullable = true)
 |-- dimessi_guariti: integer (nullable = true)
 |-- deceduti: integer (nullable = true)
 |-- casi_da_sospetto_diagnostico: integer (nullable = true)
 |-- casi_da_screening: integer (nullable = true)
 |-- totale_casi: integer (nullable = true)
 |-- tamponi: integer (nullable = true)
 |-- casi_testati: integer (nullable = true)
 |-- note: string (nullable = true)
 |-- ingressi_te

Unnamed: 0,data,stato,codice_regione,denominazione_regione,lat,long,ricoverati_con_sintomi,terapia_intensiva,totale_ospedalizzati,isolamento_domiciliare,...,deceduti,casi_da_sospetto_diagnostico,casi_da_screening,totale_casi,tamponi,casi_testati,note,ingressi_terapia_intensiva,note_test,note_casi
0,2020-02-24 18:00:00,ITA,13,Abruzzo,42.351222,13.398438,0,0,0,0,...,0,,,0,5,,,,,
1,2020-02-24 18:00:00,ITA,17,Basilicata,40.639471,15.805148,0,0,0,0,...,0,,,0,0,,,,,
2,2020-02-24 18:00:00,ITA,18,Calabria,38.905976,16.594402,0,0,0,0,...,0,,,0,1,,,,,
3,2020-02-24 18:00:00,ITA,15,Campania,40.839566,14.250850,0,0,0,0,...,0,,,0,10,,,,,
4,2020-02-24 18:00:00,ITA,8,Emilia-Romagna,44.494367,11.341721,10,2,12,6,...,0,,,18,148,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6358,2020-12-22 17:00:00,ITA,19,Sicilia,38.115697,13.362357,1059,176,1235,32257,...,2203,,,86092,1155288,773208.0,,7,,
6359,2020-12-22 17:00:00,ITA,9,Toscana,43.769231,11.255889,941,175,1116,10721,...,3495,,,116544,1806133,1035769.0,,3,Positivi diagnosticati solo con test antigenic...,
6360,2020-12-22 17:00:00,ITA,10,Umbria,43.106758,12.388247,244,44,288,3441,...,578,,,27396,481344,245205.0,,5,,
6361,2020-12-22 17:00:00,ITA,2,Valle d'Aosta,45.737503,7.320149,90,5,95,361,...,368,,,7073,60537,36564.0,,1,,


In [7]:
# Test with my csv for NLP 
csv_df = spark.read.options(header='True',inferSchema='True').csv("s3a://legger/NLPInput/message_example.csv")
csv_df.printSchema()
csv_df.toPandas()

root
 |-- error_category: string (nullable = true)
 |-- error_message: string (nullable = true)



Unnamed: 0,error_category,error_message
0,PERMISSION_DENIED,[gfalt_copy_file][perform_copy][gfal_http_copy...
1,PERMISSION_DENIED,[gfalt_copy_file][perform_copy][gfal_http_copy...
2,COMMUNICATION_ERROR_ON_SEND,DESTINATION OVERWRITE srm-ifce err: Communicat...
3,INVALID_REQUEST_DESCRIPTOR,DESTINATION MAKE_PARENT srm-ifce err: Invalid ...
4,CONNECTION_TIMED_OUT,DESTINATION SRM_PUT_TURL srm-ifce err: Connect...
...,...,...
711823,COMMUNICATION_ERROR_ON_SEND,DESTINATION OVERWRITE srm-ifce err: Communicat...
711824,COMMUNICATION_ERROR_ON_SEND,DESTINATION OVERWRITE srm-ifce err: Communicat...
711825,COMMUNICATION_ERROR_ON_SEND,DESTINATION OVERWRITE srm-ifce err: Communicat...
711826,COMMUNICATION_ERROR_ON_SEND,DESTINATION OVERWRITE srm-ifce err: Communicat...


In [8]:
# Test with my CSV for Higgs
df = spark.read.options(header='True',inferSchema='True').csv("s3a://legger/MLCourseInput/Higgs1M.csv")
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- label: double (nullable = true)
 |-- lepton_pT: double (nullable = true)
 |-- lepton_eta: double (nullable = true)
 |-- lepton_phi: double (nullable = true)
 |-- missing_energy_magnitude: double (nullable = true)
 |-- missing_energy_phi: double (nullable = true)
 |-- jet1_pt: double (nullable = true)
 |-- jet1_eta: double (nullable = true)
 |-- jet1_phi: double (nullable = true)
 |-- jet1_b-tag: double (nullable = true)
 |-- jet2_pt: double (nullable = true)
 |-- jet2_eta: double (nullable = true)
 |-- jet2_phi: double (nullable = true)
 |-- jet2_b-tag: double (nullable = true)
 |-- jet3_pt: double (nullable = true)
 |-- jet3_eta: double (nullable = true)
 |-- jet3_phi: double (nullable = true)
 |-- jet3_b-tag: double (nullable = true)
 |-- jet4_pt: double (nullable = true)
 |-- je4_eta: double (nullable = true)
 |-- jet4_phi: double (nullable = true)
 |-- jet4_b-tag: double (nullable = true)
 |-- m_jj: double (nullable = true)
 |-- m_jjj: 

In [9]:
spark.stop()