## Process: IIS logs ET and load to parquet
### Env: Microsoft Fabric 
### Tools: Notebooks, Environments, Lakehouse (Tables and Files)
### No built-in libraries: user_agents and geoip2

##### Libraries

In [13]:
from pyspark import SparkFiles
from pyspark.sql.types import StructType, StructField, DateType, StringType, BooleanType, IntegerType
from pyspark.sql.functions import col, udf, lit, split, year, month
from user_agents import parse
from geoip2 import database
# import requests
# import json

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 16, Finished, Available, Finished)

#### Paths of use

In [14]:
# In
log_path = "Files/logs_upload/*"

# Out
# not necessary, direct connection in Fabric when writing 

# IP Catalog
IP_catalog_path = "Files/catalogos/dbip-city-lite-2023-06.mmdb"

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 17, Finished, Available, Finished)

#### Dataframe Schema definition and df gen

In [15]:
logsSchema = StructType([
    StructField("Date", DateType()),
    StructField("Time", StringType()),
    StructField("ServerIP", StringType()),
    StructField("Method", StringType()),
    StructField("UriStem", StringType()),
    StructField("UriQuery", StringType()),
    StructField("Port", StringType()),
    StructField("UserName", StringType()),
    StructField("ClientIP", StringType()),
    StructField("UserAgent", StringType()),
    StructField("Referer", StringType()),
    StructField("HttpStatus", StringType()),
    StructField("Substatus", StringType()),
    StructField("Win32Status", StringType()),
    StructField("TimeTaken", StringType())
    ])

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 18, Finished, Available, Finished)

In [16]:
LogsDataFrame = spark.read.options(header=False, sep=" ",recursiveFileLookup=True).schema(logsSchema).csv(log_path)

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 19, Finished, Available, Finished)

#### Remove unused registers

In [17]:
LogsDataFrame_cleaned = LogsDataFrame.where(col("HttpStatus") != "undefined").where(col("HttpStatus") != "cs(Referer)")

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 20, Finished, Available, Finished)

#### Filter information about successful requests to aspx pages

In [18]:
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("Method") == "GET")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("HttpStatus").between(200,299))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("TimeTaken") > 0)
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("UriStem") != "-")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("Port") != "-")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("ClientIP") != "-")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("UserAgent") != "-")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("Referer") != "-")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("HttpStatus") != "-")
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(~ col("UserAgent").like("%bot%"))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.where(col("UriStem").like("%.aspx%"))

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 21, Finished, Available, Finished)

#### Define Functions to determine Operating System (OS), Device, Browser, and Country and Region, the latter two based on the client's IP

In [19]:
def UserAgentOS(stringUserAgent):
    try:
        userAgent = parse(stringUserAgent)
        return userAgent.os.family
    except :
        return ""

def UserAgentDevice(stringUserAgent):
    try:
        userAgent = parse(stringUserAgent)
        return userAgent.device.family
    except :
        return ""

def UserAgentBrowser(stringUserAgent):
    try:
        userAgent = parse(stringUserAgent)
        return userAgent.browser.family
    except :
        return ""

def UserAgentIsMobile(stringUserAgent):
    try:
        userAgent = parse(stringUserAgent)
        return userAgent.is_mobile 
    except :
        return False

UserAgent_OS_UDF = udf(UserAgentOS, StringType()) 
UserAgent_Device_UDF = udf(UserAgentDevice, StringType()) 
UserAgent_Browser_UDF = udf(UserAgentBrowser, StringType()) 
UserAgent_IsMobile_UDF = udf(UserAgentIsMobile, BooleanType()) 

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 22, Finished, Available, Finished)

#### Add columns: OS, Device, Browser, Country, Region

In [20]:
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("OS", UserAgent_OS_UDF( col("UserAgent")))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Device", UserAgent_Device_UDF( col("UserAgent")))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Browser", UserAgent_Browser_UDF( col("UserAgent")))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Is_Mobile", UserAgent_IsMobile_UDF( col("UserAgent")))

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 23, Finished, Available, Finished)

#### Define Function to obtain the Country and Region based on an IP

In [28]:
@udf(returnType=StringType())
def geoip(ip):
    geo = database.Reader(IP_catalog_path)
    
    try:
        result = geo.city(ip)
        
        specific = result.subdivisions
        region = ""
        if(specific) :
            region = specific[0].name

        strignValue = result.country.name + ","+  region + "," + str(result.location.latitude) + "," + str(result.location.longitude)
        return strignValue
    except :
        return " , , , "

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 31, Finished, Available, Finished)

#### Adding Location Fields Based on IP in the Data Frame

In [29]:
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Country", split(geoip( col("ClientIP")),",").getItem(0))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Region", split(geoip( col("ClientIP")),",").getItem(1))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Latitude", split(geoip( col("ClientIP")),",").getItem(2))
LogsDataFrame_cleaned = LogsDataFrame_cleaned.withColumn("Longitude", split(geoip( col("ClientIP")),",").getItem(3))

StatementMeta(, 2ba2daca-8acf-49a2-8d8d-a5938d0a4bb4, 32, Finished, Available, Finished)

In [13]:
LogsDataFrame_cleaned.createOrReplaceTempView("test")

StatementMeta(, 20c02360-4622-4c26-9378-86560f7e37db, 17, Finished, Available, Finished)

In [14]:
%%sql
select * from test limit 10

StatementMeta(, 20c02360-4622-4c26-9378-86560f7e37db, 18, Finished, Available, Finished)

Error: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 15) (vm-d3c19131 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_9360/1204554484.py", line 3, in geoip
  File "/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/geoip2/database.py", line 119, in __init__
    self._db_reader = maxminddb.open_database(fileish, mode)
  File "/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/maxminddb/__init__.py", line 80, in open_database
    return cast(Reader, _extension.Reader(database, mode))
FileNotFoundError: [Errno 2] No such file or directory: b'abfss://7e8183df-7c4d-4b97-b700-576837c90544@onelake.dfs.fabric.microsoft.com/fd4667a5-b1b0-429d-869a-f9d940ad7fae/Files/catalogos/dbip-city-lite-2023-06.mmdb'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:424)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:

#### Store results as Delta Table

In [None]:
# first time, overwrite
LogsDataFrame_cleaned.write.format("delta").mode("overwrite").save("Logs")

# to schedule, append
# LogsDataFrame_cleaned.write.format("delta").mode("append").save("Logs")

StatementMeta(, , , Cancelled, , Cancelled)