In [13]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import docker

import requests

import logging
import os
import subprocess
import sys

import shutil

from collections import defaultdict

In [None]:
from new_project_pipeline import predict_tags_for_new_project

In [None]:
predict_tags_for_new_project("https://github.com/zuevmaxim/itmo-ibd.git", )

In [None]:
# Start Spark session
spark = (SparkSession
         .builder
         .appName("Handle new project pipline")
         .getOrCreate())

In [None]:
predict_tags_for_new_project("https://github.com/zuevmaxim/itmo-ibd.git", )

In [14]:
# Start Spark session
spark = (SparkSession
         .builder
         .appName("Handle new project pipline")
         .getOrCreate())

In [15]:
def create_dir(dir_path: str):
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)

In [16]:
def remove_dir(dir_path: str):
    shutil.rmtree(dir_path)

In [17]:
PATH_TO_CLONE_REPO = "./test_project"
GIT_CLONE_LINK="https://github.com/zuevmaxim/itmo-ibd.git"
PROJECT_OWNER = GIT_CLONE_LINK.split("/")[-2]
PROJECT_NAME = GIT_CLONE_LINK.split("/")[-1].split(".git")[0]
PROJECT_PATH = os.path.join(PATH_TO_CLONE_REPO, PROJECT_NAME)

In [18]:
create_dir(PROJECT_PATH)

In [19]:
#First of all clone repo
p = subprocess.Popen(['git', 'clone', GIT_CLONE_LINK, PROJECT_PATH, '--depth', '1'])
return_code = p.wait()
if return_code != 0:
    logging.info(f'Error while cloning {GIT_CLONE_LINK}!')
    exit(1)

Cloning into './test_project/itmo-ibd'...
[KUpdating files: 100% (160/160), done.


In [20]:
#Compute extensions metrics
cont_extensions = defaultdict(int)
for root, _, files in os.walk(PROJECT_PATH):
    for filename in files:
        extension = os.path.splitext(filename)[1]
        cont_extensions[extension] += 1
extensions_metrics = []
for extension, count in cont_extensions.items():
    extensions_metrics.append((f"{PROJECT_NAME}" ,extension, count))

In [None]:
extensions_metrics_dataset = spark.createDataFrame(extensions_metrics).toDF(*["project_name", "extension", "count"]).cache()

In [None]:
extensions_metrics_dataset.show()

In [None]:
def rename_extension(package_name):
    return f"extension#{package_name}"

udf_rename_extension = F.udf(rename_extension, returnType=StringType())

In [None]:
extensions_metrics_dataset = extensions_metrics_dataset.select("project_name", udf_rename_extension("extension").alias("extension"), "count")
extensions_metrics_dataset.show()

In [None]:
# generate unique folders every time
PATH_TO_LUPA_KOTLIN_OUTPUT = "/home/Dmitry.Pogrebnoy/Desktop/tmp_lupa_kotlin_output"
PATH_TO_LUPA_PYTHON_OUTPUT = "/home/Dmitry.Pogrebnoy/Desktop/tmp_lupa_python_output"

In [None]:
create_dir(PATH_TO_LUPA_KOTLIN_OUTPUT)
create_dir(PATH_TO_LUPA_PYTHON_OUTPUT)

In [None]:
docker_volumes= {
    f'{PATH_TO_CLONE_REPO}' : {'bind' : '/data', 'mode' : 'ro'},
    f'{PATH_TO_LUPA_PYTHON_OUTPUT}' : {'bind' : '/output_python', 'mode' : 'rw'},
    f'{PATH_TO_LUPA_KOTLIN_OUTPUT}' : {'bind' : '/output_kotlin', 'mode' : 'rw'}
}

In [None]:
# run lupa docker to extract imports
docker_client = docker.from_env()
docker_client.containers.run('pogrebnoy/ibd-lupa-extract-imports:1.0.0',
                                         auto_remove=True,
                                         #user=f"{os.getuid()}", # Fails lupa with Exception in thread "main" java.lang.RuntimeException: Could not create parent directory for lock file /Lupa/?/.gradle/wrapper/dists/gradle-6.8.3-bin/7ykxq50lst7lb7wx1nijpicxn/gradle-6.8.3-bin.zip.lck
                                         stderr=True,
                                         volumes=docker_volumes)

In [None]:
# Gathering all imports data to one dataset
python_imports_dataset = spark.read.csv(os.path.join(PATH_TO_LUPA_PYTHON_OUTPUT, "import_statements_data.csv"), header=True).cache()
python_imports_dataset.show()

In [None]:
kotlin_imports_dataset = spark.read.csv(os.path.join(PATH_TO_LUPA_KOTLIN_OUTPUT, "import_directives_data.csv"), header=True).cache()
kotlin_imports_dataset.show()

In [None]:
imports_dataset = python_imports_dataset.union(kotlin_imports_dataset).cache()
imports_dataset.show()

In [None]:
PATH_TO_IMPORT_TO_PACKAGE_DATASET="/home/Dmitry.Pogrebnoy/Desktop/itmo-ibd/data/full_import_dataset/lupa_import_grouping/output/import_by_package.csv"

In [None]:
import_to_package_dataset = spark.read.csv(PATH_TO_IMPORT_TO_PACKAGE_DATASET, header=True)
import_to_package_dataset.show()

In [None]:
import_to_package_dataset = import_to_package_dataset.toPandas()
import_to_package_dict = dict(zip(import_to_package_dataset["import"], import_to_package_dataset["package"]))
import_to_package_dict

In [None]:
def get_package_by_import(lib_import):
    if lib_import in import_to_package_dict:
        return import_to_package_dict[lib_import]
    else:
        return lib_import

map_import_to_package = F.udf(get_package_by_import, returnType=StringType())

In [None]:
full_import_dataset = imports_dataset.select(
 "*", map_import_to_package("import").alias("package")
).cache()
full_import_dataset.show()

In [None]:
# Make final dataset

In [None]:
intermediate_dataframe = (full_import_dataset.select("*")
                          .groupby(['project_name', 'package'])
                          .agg(F.count("*").alias("count_different_import")))

In [None]:
intermediate_dataframe.show()

In [None]:
def rename_package(package_name):
    return f"package#{package_name}"

udf_rename_package = F.udf(rename_package, returnType=StringType())

In [None]:
intermediate_dataframe = intermediate_dataframe.select(
 "project_name", udf_rename_package("package").alias("package")
).cache()
intermediate_dataframe.show()

In [None]:
pivot_package_dataframe = intermediate_dataframe.groupby("project_name").pivot("package").agg(F.count("*"))
pivot_package_dataframe.show()

In [None]:
pivot_ext_count_dataset = extensions_metrics_dataset.groupby("project_name").pivot("extension").agg(F.first("count"))
pivot_ext_count_dataset.show()

In [None]:
final_dataset = pivot_package_dataframe.join(pivot_ext_count_dataset, ["project_name"])
final_dataset.show()

In [None]:
final_dataset_dict = final_dataset.collect()[0].asDict(True)
final_dataset_dict

In [None]:
PATH_TO_COLUMN_DATASET = "/home/Dmitry.Pogrebnoy/Desktop/itmo-ibd/data/pipeline/final_columns.csv"

In [None]:
columns_dataset = spark.read.csv(PATH_TO_COLUMN_DATASET, header=True).toPandas()["column_name"].to_list()
final_data_for_prediction = []
for item in columns_dataset:
    if item in final_dataset_dict:
        final_data_for_prediction.append(final_dataset_dict.get(item))
    else:
        final_data_for_prediction.append(0)

In [None]:
final_data_for_prediction

In [None]:
# Not all zeros
sum(final_data_for_prediction[1:])

In [None]:
# Then we should pass the data to predictor and that's it

In [None]:
# bla bla bla

In [None]:
PATH_TO_TAG_DATASET = "/home/Dmitry.Pogrebnoy/Desktop/itmo-ibd/data/pipeline/final_tags.csv"

In [None]:
tags_dataset = spark.read.csv(PATH_TO_COLUMN_DATASET, header=True).toPandas()["tag_name"].to_list()

In [None]:
print(tags_dataset[0])
print(tags_dataset[10])
print(tags_dataset[20])