# Security Data Lakehouse Getting Started

## Init Environment

In [18]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lakehouse Demo").getOrCreate()

spark

24/07/21 16:41:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [19]:
%%sql

CREATE DATABASE IF NOT EXISTS siem_log

24/07/21 16:41:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [20]:
%%sql

DROP TABLE IF EXISTS siem_log.total_logs

## Insert data from json file 1

In [22]:
from pyspark.sql.types import StructType
spark.conf.set('spark.sql.caseSensitive', True)

path = "/home/iceberg/warehouse/data/data_siem_1.json"
logs_df = spark.read.json(path)
schema = logs_df.schema
fields = sorted(schema.fields, key=lambda f: f.name)
sorted_schema = StructType(fields)
sorted_columns = [field.name for field in sorted_schema.fields]
sorted_logs_df = logs_df.select(sorted_columns[1:])

                                                                                

In [None]:
logs_df.printSchema()

In [24]:
logs_df.write.saveAsTable("siem_log.total_logs")

                                                                                

In [None]:
%%sql

DESCRIBE EXTENDED siem_log.total_logs

In [25]:
%%sql
SELECT COUNT(*) as cnt
FROM siem_log.total_logs

cnt
70402


In [26]:
%%sql
SELECT *
FROM siem_log.total_logs.snapshots

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2024-07-21 16:41:43.159000,3196237193718442610,,append,s3a://warehouse/wh/siem_log/total_logs_48d30eed-9822-4003-97e8-a3a4ec9f360b/metadata/snap-3196237193718442610-1-2af40039-a306-4c89-bafc-9bff9fd67bd5.avro,"{'spark.app.id': 'local-1721579760824', 'changed-partition-count': '1', 'added-data-files': '8', 'total-equality-deletes': '0', 'added-records': '70402', 'total-position-deletes': '0', 'added-files-size': '10597143', 'total-delete-files': '0', 'total-files-size': '10597143', 'total-records': '70402', 'total-data-files': '8'}"


## Insert data from json file 2

In [27]:
from pyspark.sql.types import StructType

path = "/home/iceberg/warehouse/data/data_siem_2.json"
logs_second_df = spark.read.option("mode", "DROPMALFORMED").json(path)
schema_second = logs_second_df.schema
fields = sorted(schema_second.fields, key=lambda f: f.name)
sorted_schema_second = StructType(fields)
sorted_columns_second = [field.name for field in sorted_schema_second.fields]
# sorted_columns_second[1:]
sorted_logs_second_df = logs_second_df.select(sorted_columns_second[1:])

                                                                                

In [28]:
%%sql
ALTER TABLE siem_log.total_logs SET TBLPROPERTIES (
  'write.spark.accept-any-schema'='true'
)

In [29]:
spark.conf.set('spark.sql.iceberg.check-ordering', False)

sorted_logs_second_df.writeTo("siem_log.total_logs").option("mergeSchema","true").append()

                                                                                

In [30]:
%%sql
SELECT *
FROM siem_log.total_logs.snapshots ORDER BY committed_at desc;

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2024-07-21 16:42:03.930000,3094708245849372583,3.1962371937184425e+18,append,s3a://warehouse/wh/siem_log/total_logs_48d30eed-9822-4003-97e8-a3a4ec9f360b/metadata/snap-3094708245849372583-1-9618fd08-a299-472d-9609-f0fb5bedc949.avro,"{'spark.app.id': 'local-1721579760824', 'changed-partition-count': '1', 'added-data-files': '8', 'total-equality-deletes': '0', 'added-records': '163425', 'total-position-deletes': '0', 'added-files-size': '22083047', 'total-delete-files': '0', 'total-files-size': '32680190', 'total-records': '233827', 'total-data-files': '16'}"
2024-07-21 16:41:43.159000,3196237193718442610,,append,s3a://warehouse/wh/siem_log/total_logs_48d30eed-9822-4003-97e8-a3a4ec9f360b/metadata/snap-3196237193718442610-1-2af40039-a306-4c89-bafc-9bff9fd67bd5.avro,"{'spark.app.id': 'local-1721579760824', 'changed-partition-count': '1', 'added-data-files': '8', 'total-equality-deletes': '0', 'added-records': '70402', 'total-position-deletes': '0', 'added-files-size': '10597143', 'total-delete-files': '0', 'total-files-size': '10597143', 'total-records': '70402', 'total-data-files': '8'}"


In [31]:
%%sql
SELECT COUNT(*) as cnt
FROM siem_log.total_logs

cnt
233827


## Insert data from json file 3

In [32]:
from pyspark.sql.types import StructType

path = "/home/iceberg/warehouse/data/data_siem_3.json"
logs_third_df = spark.read.option("mode", "DROPMALFORMED").json(path)
schema_third = logs_third_df.schema
fields = sorted(schema_third.fields, key=lambda f: f.name)
sorted_schema_third = StructType(fields)
sorted_columns_third = [field.name for field in sorted_schema_third.fields]
# sorted_columns_third[1:]
sorted_logs_third_df = logs_third_df.select(sorted_columns_third[1:])

                                                                                

In [33]:
sorted_logs_third_df.writeTo("siem_log.total_logs").option("mergeSchema","true").append()

IllegalArgumentException: Cannot change column type: mailto: list<string> -> string

In [34]:
sorted_columns_third = list(filter(lambda x: x != "mailto", sorted_columns_third))
sorted_columns_third = list(filter(lambda x: x != "rcpt_to", sorted_columns_third))
sorted_logs_third_df = logs_third_df.select(sorted_columns_third[1:])
sorted_logs_third_df.writeTo("siem_log.total_logs").option("mergeSchema","true").append()

                                                                                

In [35]:
%%sql
SELECT *
FROM siem_log.total_logs.snapshots ORDER BY committed_at desc;

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2024-07-21 16:42:29.125000,3032922774633318531,3.094708245849372e+18,append,s3a://warehouse/wh/siem_log/total_logs_48d30eed-9822-4003-97e8-a3a4ec9f360b/metadata/snap-3032922774633318531-1-9a45e3ac-9f34-4d10-aa02-5463ab2e748c.avro,"{'spark.app.id': 'local-1721579760824', 'changed-partition-count': '1', 'added-data-files': '8', 'total-equality-deletes': '0', 'added-records': '210734', 'total-position-deletes': '0', 'added-files-size': '27234109', 'total-delete-files': '0', 'total-files-size': '59914299', 'total-records': '444561', 'total-data-files': '24'}"
2024-07-21 16:42:03.930000,3094708245849372583,3.1962371937184425e+18,append,s3a://warehouse/wh/siem_log/total_logs_48d30eed-9822-4003-97e8-a3a4ec9f360b/metadata/snap-3094708245849372583-1-9618fd08-a299-472d-9609-f0fb5bedc949.avro,"{'spark.app.id': 'local-1721579760824', 'changed-partition-count': '1', 'added-data-files': '8', 'total-equality-deletes': '0', 'added-records': '163425', 'total-position-deletes': '0', 'added-files-size': '22083047', 'total-delete-files': '0', 'total-files-size': '32680190', 'total-records': '233827', 'total-data-files': '16'}"
2024-07-21 16:41:43.159000,3196237193718442610,,append,s3a://warehouse/wh/siem_log/total_logs_48d30eed-9822-4003-97e8-a3a4ec9f360b/metadata/snap-3196237193718442610-1-2af40039-a306-4c89-bafc-9bff9fd67bd5.avro,"{'spark.app.id': 'local-1721579760824', 'changed-partition-count': '1', 'added-data-files': '8', 'total-equality-deletes': '0', 'added-records': '70402', 'total-position-deletes': '0', 'added-files-size': '10597143', 'total-delete-files': '0', 'total-files-size': '10597143', 'total-records': '70402', 'total-data-files': '8'}"


## Doing Stuff with Spark

In [36]:
%%sql
SELECT COUNT(*) as cnt
FROM siem_log.total_logs

cnt
444561


In [37]:
%%sql
SELECT event_id, log_parser, device_product, timestamp
FROM siem_log.total_logs WHERE device_product = 'netad'

event_id,log_parser,device_product,timestamp
28267794-41ff-4b99-a2b3-4e8461e65964,netad,netad,1704965390594
ff69bb02-aff0-4a9f-b653-5aa7d2788916,netad,netad,1704965390599
de7dfce0-4872-4184-b766-c3893f12115d,netad,netad,1704965390600
3b5a502b-54e7-47eb-abee-e77c3564b5b7,netad,netad,1704965390524
de32099e-9c6f-46aa-90f7-693cee37a495,netad,netad,1704965390618
7c4f466b-fa8c-4cba-a7c4-f07316766162,netad,netad,1704965390599
3d8a200e-9d7c-4210-87fa-c8c3982bf6fd,netad,netad,1704965390599
3cdf6d20-302e-4c56-88e4-551311927d7c,netad,netad,1704965390599
cb4c04f5-e40d-452d-8658-08d2edd78b68,netad,netad,1704965390603
caea9bc3-c71a-4855-b3c2-4a804febf9b2,netad,netad,1704965390576


## Doing Stuff with Trino

In [None]:
%load_ext sql
%sql trino://user@trino:8080/demo --alias trino

In [1]:
%load_ext sql

Deploy Panel apps for free on Ploomber Cloud! Learn more: https://ploomber.io/s/signup


In [2]:
%sql spark

In [6]:
%%sql
SELECT COUNT(*) as cnt
FROM siem_log.total_logs
-- Cant handling with case-sensitive column in iceberg format????? Trino is broken


UsageError: Cell magic `%%sparksql` not found.


In [5]:
# save the available connections to a dictionary
%sql -l

current,url,alias
*,,SparkSession


In [8]:
%sql spark

In [102]:
%%sql
SELECT COUNT(*) as cnt
FROM siem_log.total_logs

RuntimeError: (trino.exceptions.TrinoQueryError) TrinoQueryError(type=INTERNAL_ERROR, name=GENERIC_INTERNAL_ERROR, message="Multiple entries with same key: user_logon_id=724:user_logon_id:varchar and user_logon_id=722:user_logon_Id:varchar", query_id=20240721_143610_00050_q7e8x)
[SQL: SELECT COUNT(*) as cnt
FROM siem_log.total_logs]
(Background on this error at: https://sqlalche.me/e/20/dbapi)
If you need help solving this issue, send us a message: https://ploomber.io/community


In [89]:
%%sql
SELECT event_id,timestamp,device_product,log_parser
FROM siem_log.total_logs WHERE device_product = 'VCS-aJiant'

CPU times: user 13.8 ms, sys: 5.27 ms, total: 19.1 ms
Wall time: 554 ms


event_id,timestamp,device_product,log_parser
80879d6e-f437-4e17-b82b-c632ad10aecb,1704965449487,VCS-aJiant,cef_default
f2999d94-fe1e-4490-8016-a39289134ab0,1704965449491,VCS-aJiant,cef_default
b2398366-202b-4aed-b4d1-2cf0a62ea281,1704965449442,VCS-aJiant,cef_default
0cc6e83b-57fc-43dd-92ec-1858f0060b81,1704965449457,VCS-aJiant,cef_default
563c60dd-df0a-485a-a231-90b52bf8912f,1704965449520,VCS-aJiant,cef_default
372eb4aa-8e58-4679-ac25-2d4b3ede5f8d,1704965449511,VCS-aJiant,cef_default
f4bcd1d5-88fb-4b8c-8e34-83610913a704,1704965449555,VCS-aJiant,cef_default
e82510a3-d4a0-4766-95fd-0fe08fad1be9,1704965449469,VCS-aJiant,cef_default
e54bc742-5fd3-4aef-99d7-4501292a5e2c,1704965449570,VCS-aJiant,cef_default
dc583d90-7b81-4efe-8b44-d2ec016ebc60,1704965449510,VCS-aJiant,cef_default
