In [0]:
%sh 
# get repl vm cluster id
cat /databricks/runtime/info.json

In [0]:
import socket

# Get hostname
hostname = socket.gethostname()

# Get IP address (best effort; may return 127.0.0.1 if no network)
ip_address = socket.gethostbyname(hostname)

clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

print(f"Hostname: {hostname}")
print(f"IP Address: {ip_address}")
print(f"Cluster ID: {clusterId}" )

In [0]:
from pyspark.sql.functions import udf, explode, col
from pyspark.sql.types import StringType, ArrayType

In [0]:

@udf
def host_info():
    import socket

    # Get hostname
    hostname = socket.gethostname()

    # Get IP address (best effort; may return 127.0.0.1 if no network)
    ip_address = socket.gethostbyname(hostname)
    
    return f"hostname={hostname}, ip={ip_address}"

display(spark.range(10).withColumn("host_info", host_info()))  

In [0]:
# enable debug log on driver
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

# Create a 1-row DataFrame
df = spark.createDataFrame([("dummy",)], ["col"])

# Define a Python function that captures driver info
def get_driver_info(dummy):
    import socket
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname)

    with open("/Workspace/.proc/self/flags/enable_debug_log", "w") as f:
        f.write("true")
    with open("/Workspace/.proc/self/flags/enable_debug_log", "r") as f:
        lines = " ".join(f.readlines())

    return f"host={hostname}, ip={ip}, debug={lines}"

# Register it as a UDF
get_driver_info_udf = F.udf(get_driver_info, StringType())

# Apply the UDF
df_with_info = df.withColumn("driver_info", get_driver_info_udf("col"))

# Collect the result — if UDF runs in a single task, may execute on driver
display(df_with_info.collect())

In [0]:
%sh

# enable debug log for the repl vm
echo "true" > /Workspace/.proc/self/flags/enable_debug_log
cat /Workspace/.proc/self/flags/enable_debug_log

In [0]:
%sh

cat /Workspace/.proc/self/token | openssl enc -base64

In [0]:
# enable debug log on all
@udf
def enable_debug_log():
    import socket
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname)

    with open("/Workspace/.proc/self/flags/enable_debug_log", "w") as f:
        f.write("true")
    with open("/Workspace/.proc/self/flags/enable_debug_log", "r") as f:
        lines = " ".join(f.readlines())

    return f"host={hostname}, ip={ip}, debug={lines}"

display(spark.range(50).withColumn("enable_debug_log", enable_debug_log()))  

In [0]:
@udf(ArrayType(StringType()))
def return_syspath():
    import sys
    return sys.path

display(spark.range(1).withColumn("syspath", explode(return_syspath())))

In [0]:
import os

@udf(ArrayType(StringType()))
def return_info():
    # read and return the content of file /Workspace/.proc/self/token
    # with open('/databricks/init/info.json', 'r', encoding='utf-8') as file:
    #    lines = file.readlines()
    #return lines
    #[spark.conf.get("spark.databricks.clusterUsageTags.clusterId")]
    return [f"{key}={value}" for key, value in os.environ.items()]

display(spark.range(1).withColumn("token", explode(return_info())))

In [0]:
@udf(ArrayType(StringType()))
def return_token():
    # read and return the content of file /Workspace/.proc/self/token
    with open('/Workspace/.proc/self/token', 'r', encoding='utf-8') as file:
        lines = file.readlines()
    return lines

display(spark.range(1).withColumn("token", explode(return_token())))

In [0]:
@udf(ArrayType(StringType()))
def return_listdir_udf(folder):
    import os
    return os.listdir(folder)

try:
    display(spark.range(1)
            .withColumn("syspath", explode(return_syspath()))
            .filter("syspath like '%Workspace%'")
            .withColumn("list_dir", explode(return_listdir_udf(col("syspath"))))
            )
except Exception as e:
    print(e)  #print but ignore the error

In [0]:
import subprocess

@udf(ArrayType(StringType()))
def return_service_logs():
    # with open('/Workspace/.proc/self/logs/service', 'r', encoding='utf-8') as file:
    #     lines = file.readlines()
    # return lines
    result = subprocess.run(
        ['cat', '/Workspace/.proc/self/logs/service'],                # command and arguments
        stdout=subprocess.PIPE,      # capture stdout
        stderr=subprocess.PIPE,      # capture stderr (optional)
        text=True                    # decode bytes to str
    )
    lines = result.stdout.splitlines()
    return lines

display(spark.range(1).withColumn("logs", explode(return_service_logs())))

In [0]:
import subprocess

@udf(ArrayType(StringType()))
def return_usage_logs():
    # with open('/Workspace/.proc/self/logs/service', 'r', encoding='utf-8') as file:
    #     lines = file.readlines()
    # return lines
    result = subprocess.run(
        ['cat', '/Workspace/.proc/self/logs/usage'],                # command and arguments
        stdout=subprocess.PIPE,      # capture stdout
        stderr=subprocess.PIPE,      # capture stderr (optional)
        text=True                    # decode bytes to str
    )
    lines = result.stdout.splitlines()
    return lines

display(spark.range(1).withColumn("logs", explode(return_usage_logs())))

In [0]:
%sh

cat /Workspace/.proc/self/logs/service

In [0]:
%sh

cat /Workspace/.proc/self/logs/usage

In [0]:
%sh

# sleep for 1hr to hold the cluster, so we can login and check logs
sleep 3600