In [1]:
import pymongo
from sklearn.ensemble import IsolationForest
import numpy as np
import pandas as pd
import os
os.chdir('..')
from credentials.credentials import mongo_uri

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import DoubleType, LongType, TimestampType

In [2]:
# MongoDB connection details
MONGO_URI = mongo_uri
DATABASE = "gastospublicos"
COLLECTION_NAME = 'notasfiscais'
TARGET_COLLECTION = 'slv_notasfiscais'

In [5]:
def init_spark():
    mongo_conn = MONGO_URI

    conf = SparkConf()

    # Download mongo-spark-connector and its dependencies.
    # This will download all the necessary jars and put them in your $HOME/.ivy2/jars, no need to manually download them :
    conf.set("spark.jars.packages",
             "org.mongodb.spark:mongo-spark-connector:10.0.1")

    # Set up read connection :
    conf.set("spark.mongodb.read.connection.uri", mongo_conn)
    conf.set("spark.mongodb.read.database", f"{MONGO_URI}/{DATABASE}")
    conf.set("spark.mongodb.read.collection", f"{MONGO_URI}/{DATABASE}.{COLLECTION_NAME}")

    # Set up write connection
    conf.set("spark.mongodb.write.connection.uri", mongo_conn)
    conf.set("spark.mongodb.write.database", f"{MONGO_URI}/{DATABASE}")
    conf.set("spark.mongodb.write.collection", f"{MONGO_URI}/{DATABASE}.{TARGET_COLLECTION}")
    # If you need to update instead of inserting :
    conf.set("spark.mongodb.write.operationType", "update")

    SparkContext(conf=conf)


In [7]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("MongoDB Data Transformation").getOrCreate() \
#   .config("spark.mongodb.input.uri", f"{MONGO_URI}/{DATABASE}.{COLLECTION_NAME}") \
#    .config("spark.mongodb.output.uri", f"{MONGO_URI}/{DATABASE}.{TARGET_COLLECTION}") \
#    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.2') \
    

# Read data from MongoDB
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Data type conversions and transformations
df = df.withColumn("_id", col("_id").cast(LongType())) \
       .withColumn("DATA EMISSÃO", to_timestamp("DATA EMISSÃO")) \
       .withColumn("DATA/HORA EVENTO MAIS RECENTE", to_timestamp("DATA/HORA EVENTO MAIS RECENTE")) \
       .withColumn("VALOR NOTA FISCAL", col("VALOR NOTA FISCAL").cast(DoubleType())) 

# Add other necessary conversions based on your schema (e.g., for 'CNPJ', 'INSCRIÇÃO ESTADUAL', etc.)

# Write the transformed data to a new MongoDB collection
#df.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()


Py4JJavaError: An error occurred while calling o42.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: com.mongodb.spark.sql.DefaultSource. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


In [3]:
client = pymongo.MongoClient(MONGO_URI)
db = client[DATABASE]
collection = db[COLLECTION_NAME]

features = ["VALOR NOTA FISCAL", # Replace/add features as needed.
            'CPF/CNPJ Emitente',
            'UF EMITENTE',
            'CNPJ DESTINATÁRIO',
            'DATA/HORA EVENTO MAIS RECENTE',
            'UF DESTINATÁRIO'
           ]
data_from_mongo = collection.find({}, {feature: 1 for feature in features}) # Project only the required features

# Convert to DataFrame
df = pd.DataFrame(list(data_from_mongo))




: 

In [None]:
# 2. Data Preprocessing and Feature Engineering (Same as before):

# Select relevant numeric features
numeric_features = ['VALOR NOTA FISCAL', # List your numeric features
                    ]

data = df[numeric_features].copy()

# Handle missing values (if any)
data.fillna(data.mean(), inplace=True)  # Or use another strategy

# Feature scaling
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)

# 3. Isolation Forest Model (Same as before):

model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
model.fit(scaled_data)


# 4. Add Anomaly Scores and Predictions back to the DataFrame:

df['anomaly_score'] = model.decision_function(scaled_data)
df['anomaly'] = model.predict(scaled_data)



# 5.  Analysis and Investigation (Similar as before, but with MongoDB integration):

anomalies = df[df['anomaly'] == -1]

# ... (Analysis as before: grouping, analyzing distributions, etc.)


# Example: Update the MongoDB collection with anomaly flags:

#for index, row in anomalies.iterrows():
 #   collection.update_one({'_id': row['_id']}, {'$set': {'anomaly': True}}) # Mark as anomaly in MongoDB

# Or store anomaly results in a separate collection


# 6. Refinement and Iteration:  (As before)