In [59]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("NORD_Task") \
      .config("spark.redis.host", "cache") \
      .config("spark.redis.port", "6379") \
      .getOrCreate()

#.config("spark.redis.auth", "password") \
sc = spark.sparkContext




hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

S3_BUCKET = 's3-nord-challenge-data'
S3_REGION = 'eu-central-1'
hadoop_conf.set("fs.s3a.endpoint", f"s3.{S3_REGION}.amazonaws.com")





### Note 
In general I can see 2 approaches to load files data:
   - Approach 1.
     - `spark.read.format('binaryFile').option("pathGlobFilter","<path-glob>").load(<s3-bucket>)`. This solution would read all files with metadata into single DataFrame (path, mod time,  length, content)
     - parse content of file in apropriate resulted dataframe transformation
   - The advantage of it is that you receive parallelized DataFrame, content of file would be read in lazy way during processing each file. So in theory on a big enough spark cluster spark should take care of distributing and performance for you. The problem seems to be when you have to work with pretty read milions of files with unknown file size. You may end up huge memory and performance issues. This problem is shown e.g. in [this blog article](https://wrightturn.wordpress.com/2015/07/22/getting-spark-data-from-aws-s3-using-boto-and-pyspark/). Although it's pretty old I did not find any more recent solution to the issue. It also describes second approach.
   - Approach 2.
       - list all objects you're interested files in s3 bucket into some collection (but without parallelizing it)
       - create parallelized dataframe based on the given collection
       - read and process file content as part of transformations
   - The bottleneck might that you have to iterate over millions of files so the size of the collection to be processed (on one node) might be huge. 
   
As I am not able to test on a large set of data and big enough spark cluster which approach is more efficient. I am going to use approach described in [mentioned article](https://wrightturn.wordpress.com/2015/07/22/getting-spark-data-from-aws-s3-using-boto-and-pyspark). However instead of using boto3 for listing all objects I will use [`hadoop.fs.path.getFilesystem.globStatus`](https://stackoverflow.com/a/67050173/2018369) because boto3 [seems to be not the most effective way](https://stackoverflow.com/q/69920805/2018369) to get file list.

I was also considering one more approach, which however I could not find any good way to implement. So my idea was to create a dataframe similar to the one created by `spark.read.format('binaryFile').option("pathGlobFilter","<path-glob>").load(<s3-bucket>)`, but which contain only prefix of file (first 1024 or 2048 bytes). This way we could have a Dataframe(path, mod time,  length, PE headers), we could process the header of file to get all required PE metadata apart from imports/expors and in next step we could load apropriate sections of file to get imports/exports.


In [60]:
clean_path = '/0/*.???'
malware_path = '/1/*.???'

# number of files to process - will be read as input
# N = 100
N = 10




cleanPath = sc._jvm.org.apache.hadoop.fs.Path(f's3a://{S3_BUCKET}{clean_path}')
cFs = cleanPath.getFileSystem(hadoop_conf)
clean_files = cFs.globStatus(cleanPath)

malwarePath = sc._jvm.org.apache.hadoop.fs.Path(f's3a://{S3_BUCKET}{malware_path}')
mFs = malwarePath.getFileSystem(hadoop_conf)
malware_files = mFs.globStatus(malwarePath)

In [61]:
import random
print(len(malware_files))
files_to_process = random.sample(clean_files, int(N/2))+ random.sample(malware_files, int(N/2))
print(len(files_to_process))

malware_files

14652
10


JavaObject id=o772

In [63]:
# put files into dataFrame

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([       
    StructField('path', StringType(), True),
    StructField('size', IntegerType(), True),
    StructField('type', StringType(), True)
])
data = [(f.getPath().toUri().getRawPath(), f.getLen(), f.getPath().getName().split('.')[-1]) for f in files_to_process]

# make sure we don't have duplicates
filesDF= spark.createDataFrame(data=data, schema = schema).distinct()
filesDF.orderBy("size").show()

+--------------------+-------+----+
|                path|   size|type|
+--------------------+-------+----+
|/1/XKPsTlAqtlk2Fm...|   8192| exe|
|/0/xcua1geDkndLXH...|  38696| dll|
|/1/NfvSArPmwLKzTY...| 229888| exe|
|/0/bEUTm0O7gYAVGU...| 298496| dll|
|/0/XhleoQ8IR9jvUl...| 418816| exe|
|/1/cWd4vdJHdYO7zF...| 560792| exe|
|/0/1KI0vsoLXqDQrF...| 588800| dll|
|/1/t4MbasjrK7n3xK...| 681472| exe|
|/0/9T34YiiOwFTzxy...| 773416| dll|
|/1/WFz7wDUGRiJ545...|1051488| exe|
+--------------------+-------+----+



In [44]:
filesDF.orderBy('size').show(50,False)

+---------------------------------------+------+----+
|path                                   |size  |type|
+---------------------------------------+------+----+
|/0/T6zeSrdrxFRWSefMv69kTtEkxUiHBSQy.dll|6722  |dll |
|/0/UGNcXHVmHeWItVzZxl117HGxt957kl4x.exe|32768 |exe |
|/1/l3gQuVcjXpUTaNpZ2uJM8t1X7ZRUPAwP.exe|61440 |exe |
|/0/jP1L1vJYFw5nWjp6TqyPCDtIIh2E11Tj.exe|66707 |exe |
|/0/xI6PVtdCNEijgZLIRbkWsEIm1gvYRmWG.dll|70144 |dll |
|/1/cYEjTCfkQHMqfsLPdbdTHJKkHKdJwZIJ.exe|130560|exe |
|/0/YqnaL9g4ZZG8WeBM1xcNHR412qOZDW34.dll|304312|dll |
|/1/p66KTaeMvnaTJggUaZU5tyd83uzVg7q0.exe|338656|exe |
|/1/PnMnO2nDypZRVWqymOSIdjcCFmQwYV27.exe|352256|exe |
|/1/r80hIeF9xL6rwCwqhXfT7VqxAFdjSAlA.exe|556544|exe |
+---------------------------------------+------+----+



# Database Notes
I was considering SQL and NoSQL (key/value store) to store files info. Finally **Hybrid approach was used**

### SQL Database - MySQL
The architecture is rather not complicated. All **distinct** file records are processed and stored in one table with following schema
 `path Varchar primary key, size Int, type Varchar, architecture Varchar default NULL, imports Int default NULL, exports Int default NULL, INDEX(size, type));`
At the whole table is loaded into DataFrame. It is substracted from task files DF to ensure already processed files are skipped. And after processing transformed DF is appended to existing table in MySQL.

Although number of files processed can reach (hundred of) millions [MySQL should handle it properly](https://dba.stackexchange.com/questions/20335/can-mysql-reasonably-perform-queries-on-billions-of-rows) with proper indexes. If there are billions of rows in DB we might start [encontering problems](https://stackoverflow.com/questions/38346613/mysql-and-a-table-with-100-millions-of-rows)
In case of performance issue using different Database type might be considered as changing DB should be relatively easy. What should be changed in that case is `dataframe.write.` `format` and `options`

### NoSQL solutions

I was considering also NoSQL database which very often perform better in distributed environment and in most cases scale horizontally much easier than classical SQL DB. For this task I consider key/value store as a good solution.

#### Aerospike
Aerospike was considered as it promises high efficiency, distributed (based on shared nothing architecture) database for storing key/value pairs. In commercial version it support pyspark distributed operations, direct import to RDDs etc. So if required it might give very good performance.

#### Redis
Open source, in-memory data store used as a database, cache, streaming engine, and message broker.


### "hybrid" approach - caching
The issue with in-memory key/value store is that it does not provide (by default) persistence of data.
This can be achieved both in Redis and Aerospike of course but not by default.

My idea is to provide hybrid solution in which processed files data is stored in classical SQL database but apart from that it is also imported into key/value store. In that case there is no need to load all existing entries into DataFrame prior to processing new entries just to make sure some files weren't already processed. Instead,  `filesDF` entries that exists in key/value store should be filtered during transformation. As a last steps  `filesDB`should be saved (appended) not only to SQL database but also to key/value store




In [64]:
# get files already processed from redis cache

redis_files_info = spark.read.format("org.apache.spark.sql.redis").schema(schema)\
    .option("table", "s3").option("key.column", "path").load()


redis_files_info.show()





+--------------------+-------+----+
|                path|   size|type|
+--------------------+-------+----+
|/0/PBwWGJjJbzrfE2...|1114112| dll|
|/0/Whm7LXkmxzqBR4...| 234176| dll|
|/1/ut4TdmD1XnhHTy...| 766387| exe|
|/1/eivfiuWpSzR2DV...| 256409| exe|
|/0/2HkQQtceLYG5qJ...| 508928| dll|
|/0/srtVrrdHVHWXl2...|    497| exe|
|/0/UTOlOmrLoMroQq...|  23040| dll|
|/0/XvoKanXSZKeU1D...| 474112| dll|
|/1/oVIvHlkfisf4ln...| 356352| exe|
|/0/t73yGlRDuPunnz...|1331787| dll|
|/1/fcnuEjIwztO1ck...|1655296| exe|
|/1/JuUI8VlbhtkmGc...|1150976| exe|
|/1/pIx6g2xUNiPkua...| 621816| dll|
|/1/WY6jyp1WzaMDqZ...| 473383| exe|
|/0/oZstsPb7bzgVpT...|  26112| exe|
|/1/oWOylQbpGUjJss...| 430368| exe|
|/0/91U3yBg2klbOen...| 247296| dll|
|/1/TD8HhGIsOUh7rN...| 176128| exe|
|/1/j5UULHiUNKaJWU...| 561152| exe|
|/0/efRIRGigT73waI...|  40248| dll|
+--------------------+-------+----+



In [65]:
# Remove files that exists in DB from list of files to process
filesDF = filesDF.subtract(redis_files_info)
filesDF.show()

+--------------------+-------+----+
|                path|   size|type|
+--------------------+-------+----+
|/0/1KI0vsoLXqDQrF...| 588800| dll|
|/1/WFz7wDUGRiJ545...|1051488| exe|
|/0/bEUTm0O7gYAVGU...| 298496| dll|
|/1/NfvSArPmwLKzTY...| 229888| exe|
|/0/XhleoQ8IR9jvUl...| 418816| exe|
|/1/t4MbasjrK7n3xK...| 681472| exe|
|/1/XKPsTlAqtlk2Fm...|   8192| exe|
|/1/cWd4vdJHdYO7zF...| 560792| exe|
|/0/9T34YiiOwFTzxy...| 773416| dll|
|/0/xcua1geDkndLXH...|  38696| dll|
+--------------------+-------+----+



In [68]:
# process files 
from functools import partial
from utils import parse_file

schema_with_meta = StructType(filesDF.schema.fields+[
    StructField('architecture', StringType(), True),
    StructField('imports', IntegerType(), True),
    StructField('exports',IntegerType(), True)
])

parsed=filesDF.rdd.map(partial(parse_file, bucket=S3_BUCKET, region=S3_REGION))

parsedDF = parsed.toDF(schema_with_meta)
parsedDF.show()


<botocore.response.StreamingBody object at 0x7f676ad298a0>          (0 + 1) / 1]
<botocore.response.StreamingBody object at 0x7f676a324400>
<botocore.response.StreamingBody object at 0x7f676a5deec0>
<botocore.response.StreamingBody object at 0x7f676a8d9f60>
<botocore.response.StreamingBody object at 0x7f676a892d70>
<botocore.response.StreamingBody object at 0x7f676aaabf40>
<botocore.response.StreamingBody object at 0x7f676aaed2d0>
<botocore.response.StreamingBody object at 0x7f676ab0a5c0>
<botocore.response.StreamingBody object at 0x7f676a471c90>


+--------------------+-------+----+------------+-------+-------+
|                path|   size|type|architecture|imports|exports|
+--------------------+-------+----+------------+-------+-------+
|/0/1KI0vsoLXqDQrF...| 588800| dll|          64|    243|    150|
|/1/WFz7wDUGRiJ545...|1051488| exe|          32|    150|      3|
|/0/bEUTm0O7gYAVGU...| 298496| dll|          64|    212|      2|
|/1/NfvSArPmwLKzTY...| 229888| exe|          32|     69|      0|
|/0/XhleoQ8IR9jvUl...| 418816| exe|          64|    283|      0|
|/1/t4MbasjrK7n3xK...| 681472| exe|          32|      1|      0|
|/1/XKPsTlAqtlk2Fm...|   8192| exe|        null|     -1|     -1|
|/1/cWd4vdJHdYO7zF...| 560792| exe|          32|    200|      0|
|/0/9T34YiiOwFTzxy...| 773416| dll|          64|    172|     28|
|/0/xcua1geDkndLXH...|  38696| dll|        null|     -1|     -1|
+--------------------+-------+----+------------+-------+-------+



<botocore.response.StreamingBody object at 0x7f676a6f0370>
                                                                                

In [67]:
# DB Settings
jdbc_url = 'jdbc:mysql://db/nord_files'
table = "files_info"
username = "root"
password = "password"
driver = "com.mysql.cj.jdbc.Driver"

# Redis settings - on spark session level


In [71]:
# Store result to DB
parsedDF.write.format('jdbc').options(
    url=jdbc_url, driver=driver,dbtable=table, user=username, password=password
).mode('append').save()


<botocore.response.StreamingBody object at 0x7f676a5e7d30>          (0 + 1) / 1]
<botocore.response.StreamingBody object at 0x7f6769a29ae0>
<botocore.response.StreamingBody object at 0x7f676a330df0>
<botocore.response.StreamingBody object at 0x7f676a3a14e0>
<botocore.response.StreamingBody object at 0x7f676a445de0>
<botocore.response.StreamingBody object at 0x7f676e7f0190>
<botocore.response.StreamingBody object at 0x7f6770890a00>
<botocore.response.StreamingBody object at 0x7f676e81fcd0>
<botocore.response.StreamingBody object at 0x7f676a4db460>
<botocore.response.StreamingBody object at 0x7f6769f00640>
22/12/12 15:50:58 ERROR Executor: Exception in task 0.0 in stage 255.0 (TID 137)
java.sql.BatchUpdateException: Duplicate entry '/0/XhleoQ8IR9jvUlxamyFwE2ZZtPp7AKbU.exe' for key 'files_info.PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.De

Py4JJavaError: An error occurred while calling o931.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 255.0 failed 1 times, most recent failure: Lost task 0.0 in stage 255.0 (TID 137) (0d2e0f8f47fa executor driver): java.sql.BatchUpdateException: Duplicate entry '/0/XhleoQ8IR9jvUlxamyFwE2ZZtPp7AKbU.exe' for key 'files_info.PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:728)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:890)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '/0/XhleoQ8IR9jvUlxamyFwE2ZZtPp7AKbU.exe' for key 'files_info.PRIMARY'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1061)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:795)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1020)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1018)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:888)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:69)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.GeneratedMethodAccessor95.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.BatchUpdateException: Duplicate entry '/0/XhleoQ8IR9jvUlxamyFwE2ZZtPp7AKbU.exe' for key 'files_info.PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:728)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:890)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '/0/XhleoQ8IR9jvUlxamyFwE2ZZtPp7AKbU.exe' for key 'files_info.PRIMARY'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1061)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:795)
	... 16 more


In [72]:
# Store results also to Redis cache
parsedDF.select(["path","size", "type"]).write.format("org.apache.spark.sql.redis").option("table","s3").option("key.column", "path").mode('append').save()


<botocore.response.StreamingBody object at 0x7f676a376680>          (0 + 1) / 1]
<botocore.response.StreamingBody object at 0x7f6770afd7e0>
<botocore.response.StreamingBody object at 0x7f676a488580>
<botocore.response.StreamingBody object at 0x7f67709cd1b0>
<botocore.response.StreamingBody object at 0x7f67709e9ab0>
<botocore.response.StreamingBody object at 0x7f676a507f70>
<botocore.response.StreamingBody object at 0x7f6769e906a0>
<botocore.response.StreamingBody object at 0x7f676a507ac0>
<botocore.response.StreamingBody object at 0x7f6769ecd5d0>
<botocore.response.StreamingBody object at 0x7f6769c1c310>
22/12/12 22:39:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 20207870 ms exceeds timeout 120000 ms
22/12/12 22:39:59 WARN SparkContext: Killing executors is not supported by current scheduler.
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 46002)
Traceback (most recent call last):
  File "/usr