Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sunshowers committed Feb 22, 2012
0 parents commit a51b2de
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 0 deletions.
Empty file added src/__init__.py
Empty file.
132 changes: 132 additions & 0 deletions src/fs.py
@@ -0,0 +1,132 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

import ctypes
from ctypes import POINTER, WinError, sizeof, byref
from ctypes.wintypes import DWORD, HANDLE, BOOL

LPDWORD = POINTER(DWORD)

GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000

FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
FILE_SHARE_DELETE = 0x00000004

FILE_SUPPORTS_HARD_LINKS = 0x00400000
FILE_SUPPORTS_REPARSE_POINTS = 0x00000080

FILE_ATTRIBUTE_DIRECTORY = 0x00000010
FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400

FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000

OPEN_EXISTING = 3

MAX_PATH = 260

INVALID_HANDLE_VALUE = -1

class FILETIME(ctypes.Structure):
_fields_ = [("dwLowDateTime", DWORD),
("dwHighDateTime", DWORD)]

class BY_HANDLE_FILE_INFORMATION(ctypes.Structure):
_fields_ = [("dwFileAttributes", DWORD),
("ftCreationTime", FILETIME),
("ftLastAccessTime", FILETIME),
("ftLastWriteTime", FILETIME),
("dwVolumeSerialNumber", DWORD),
("nFileSizeHigh", DWORD),
("nFileSizeLow", DWORD),
("nNumberOfLinks", DWORD),
("nFileIndexHigh", DWORD),
("nFileIndexLow", DWORD)]

# http://msdn.microsoft.com/en-us/library/windows/desktop/aa363858
CreateFile = ctypes.windll.kernel32.CreateFileW
CreateFile.argtypes = [ctypes.c_wchar_p, DWORD, DWORD, ctypes.c_void_p,
DWORD, DWORD, HANDLE]
CreateFile.restype = HANDLE

# http://msdn.microsoft.com/en-us/library/windows/desktop/aa364944
GetFileAttributes = ctypes.windll.kernel32.GetFileAttributesW
GetFileAttributes.argtypes = [ctypes.c_wchar_p]
GetFileAttributes.restype = DWORD

# http://msdn.microsoft.com/en-us/library/windows/desktop/aa364952
GetFileInformationByHandle = ctypes.windll.kernel32.GetFileInformationByHandle
GetFileInformationByHandle.argtypes = [HANDLE, POINTER(BY_HANDLE_FILE_INFORMATION)]
GetFileInformationByHandle.restype = BOOL

# http://msdn.microsoft.com/en-us/library/windows/desktop/aa364996
GetVolumePathName = ctypes.windll.kernel32.GetVolumePathNameW
GetVolumePathName.argtypes = [ctypes.c_wchar_p, ctypes.c_wchar_p, DWORD]
GetVolumePathName.restype = BOOL

# http://msdn.microsoft.com/en-us/library/windows/desktop/aa364993
GetVolumeInformation = ctypes.windll.kernel32.GetVolumeInformationW
GetVolumeInformation.argtypes = [ctypes.c_wchar_p, ctypes.c_wchar_p, DWORD,
LPDWORD, LPDWORD, LPDWORD, ctypes.c_wchar_p,
DWORD]
GetVolumeInformation.restype = BOOL

# http://msdn.microsoft.com/en-us/library/windows/desktop/aa363216
DeviceIoControl = ctypes.windll.kernel32.DeviceIoControl
DeviceIoControl.argtypes = [HANDLE, DWORD, ctypes.c_void_p, DWORD,
ctypes.c_void_p, DWORD, LPDWORD, ctypes.c_void_p]
DeviceIoControl.restype = BOOL

# http://msdn.microsoft.com/en-us/library/windows/desktop/ms724211
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.argtypes = [HANDLE]
CloseHandle.restype = BOOL

def getfileinfo(path):
"""
Return information for the file at the given path. This is going to be a
struct of type BY_HANDLE_FILE_INFORMATION.
"""
hfile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, None, OPEN_EXISTING, 0, None)
if hfile is None:
raise WinError()
info = BY_HANDLE_FILE_INFORMATION()
rv = GetFileInformationByHandle(hfile, info)
if rv == 0:
raise WinError()
return info

def getvolumeinfo(path):
"""
Return information for the volume containing the given path. This is going
to be a pair containing (file system, file system flags).
"""

# Add 1 for a trailing backslash if necessary, and 1 for the terminating
# null character.
volpath = ctypes.create_unicode_buffer(len(path) + 2)
rv = GetVolumePathName(path, volpath, len(volpath))
if rv == 0:
raise WinError()

fsnamebuf = ctypes.create_unicode_buffer(MAX_PATH + 1)
fsflags = DWORD(0)
rv = GetVolumeInformation(volpath, None, 0, None, None, byref(fsflags),
fsnamebuf, len(fsnamebuf))
if rv == 0:
raise WinError()

return (fsnamebuf.value, fsflags.value)

def hardlinks_supported(path):
(fsname, fsflags) = getvolumeinfo(path)
# FILE_SUPPORTS_HARD_LINKS isn't supported until Windows 7, so also check
# whether the file system is NTFS
return bool((fsflags & FILE_SUPPORTS_HARD_LINKS) or (fsname == "NTFS"))

def junctions_supported(path):
(fsname, fsflags) = getvolumeinfo(path)
return bool(fsflags & FILE_SUPPORTS_REPARSE_POINTS)
35 changes: 35 additions & 0 deletions src/hardlink.py
@@ -0,0 +1,35 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

# Library to deal with hardlinks

__all__ = ["create", "samefile"]

import fs
import ctypes
from ctypes import WinError
from ctypes.wintypes import BOOL
CreateHardLink = ctypes.windll.kernel32.CreateHardLinkW
CreateHardLink.argtypes = [ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_void_p]
CreateHardLink.restype = BOOL

def create(source, link_name):
"""
Creates a hardlink at link_name referring to the same file as source.
"""
res = CreateHardLink(link_name, source, None)
if rv == 0:
raise WinError("Couldn't create hardlink from %s to %s" %
(source, link_name))

def samefile(path1, path2):
"""
Returns True if path1 and path2 refer to the same file.
"""
# Check if both are on the same volume and have the same file ID
info1 = fs.getfileinfo(path1)
info2 = fs.getfileinfo(path2)
return (info1.dwVolumeSerialNumber == info2.dwVolumeSerialNumber and
info1.nFileIndexHigh == info2.nFileIndexHigh and
info1.nFileIndexLow == info2.nFileIndexLow)
192 changes: 192 additions & 0 deletions src/junction.py
@@ -0,0 +1,192 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

# Python module to create, delete and get the target of junctions on
# Windows.

__all__ = ["create", "readlink", "unlink"]

import os
import fs
from fs import CreateFile, GetFileAttributes, DeviceIoControl, CloseHandle

import ctypes
from ctypes import WinError, sizeof, byref
from ctypes.wintypes import DWORD

IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003

FSCTL_SET_REPARSE_POINT = 0x000900A4
FSCTL_GET_REPARSE_POINT = 0x000900A8
FSCTL_DELETE_REPARSE_POINT = 0x000900AC

def new_junction_reparse_buffer(path=None):
"""
Given a path, return a pair containing a new REPARSE_DATA_BUFFER and the
length of the buffer (not necessarily the same as sizeof due to packing
issues).
If no path is provided, the maximum length is assumed.
"""

if path is None:
# The maximum reparse point data buffer length is 16384 bytes. We are a
# bit conservative here and set a length of 16000 bytes (8000
# characters) + a few more for the header.
substnamebufferchars = 8000
else:
# 1 more character for the null terminator. Python 2.x calculates
# len(surrogate pair) = 2, so multiplying this by 2 is the right thing
# to do.
substnamebufferchars = len(path) + 1

# It is amazing how ugly MSDN's version of REPARSE_DATA_BUFFER is:
# <http://msdn.microsoft.com/en-us/library/windows/hardware/ff552012>. It
# is a variable-length struct with two strings in the wchar[] buffer at
# the end. Both are supposed to be null-terminated, and the individual
# lengths do not include that of the null character, but the total
# ReparseDataLength does.
#
# In our case, only the SubstituteName part of the mount point/junction-
# specific part is relevant. So we set PrintNameLength to 0, but we still
# need to allow for one null character, so PrintNameBuffer has length 1.
class REPARSE_DATA_BUFFER(ctypes.Structure):
_fields_ = [("ReparseTag", ctypes.c_ulong),
("ReparseDataLength", ctypes.c_ushort),
("Reserved", ctypes.c_ushort),
("SubstituteNameOffset", ctypes.c_ushort),
("SubstituteNameLength", ctypes.c_ushort),
("PrintNameOffset", ctypes.c_ushort),
("PrintNameLength", ctypes.c_ushort),
("SubstituteNameBuffer", ctypes.c_wchar * substnamebufferchars),
("PrintNameBuffer", ctypes.c_wchar * 1)]

numpathbytes = (substnamebufferchars - 1) * sizeof(ctypes.c_wchar)
# We can't really use sizeof on the struct because of packing issues.
# Instead, calculate the size manually
buffersize = (numpathbytes + (sizeof(ctypes.c_wchar) * 2) +
(sizeof(ctypes.c_ushort) * 4))
if path is None:
buffer = REPARSE_DATA_BUFFER()
buffer.ReparseTag = IO_REPARSE_TAG_MOUNT_POINT
else:
buffer = REPARSE_DATA_BUFFER(
IO_REPARSE_TAG_MOUNT_POINT,
buffersize,
0,
# print name offset, length
0, numpathbytes,
# substitute name offset, length
numpathbytes + 2, 0,
# print name
path,
# substitute name
"")

return (buffer, buffersize + REPARSE_DATA_BUFFER.SubstituteNameOffset.offset)

def unparsed_convert(path):
path = os.path.abspath(path)
# Remove the trailing slash for root drives
if path[-2:] == ":\\":
path = path[:-1]
# This magic prefix disables parsing. Note that we do not want to use
# \\?\, since that doesn't tolerate a different case.
return "\\??\\" + path

def unparsed_unconvert(path):
if path[0:4] == "\\??\\":
path = path[4:]
return path

def isjunction(path):
if not os.path.exists(path) or not fs.junctions_supported(path):
return False

attrs = GetFileAttributes(path)
return bool((attrs & fs.FILE_ATTRIBUTE_DIRECTORY) and
(attrs & fs.FILE_ATTRIBUTE_REPARSE_POINT))

def create(source, link_name):
"""
Create a junction at link_name pointing to source.
"""
success = False
if not os.path.isdir(source):
raise Exception("%s is not a directory" % source)
if os.path.exists(link_name):
raise Exception("%s: junction link name already exists" % link_name)

link_name = os.path.abspath(link_name)
os.mkdir(link_name)

# Get a handle to the directory
hlink = CreateFile(link_name, fs.GENERIC_WRITE,
fs.FILE_SHARE_READ | fs.FILE_SHARE_WRITE, None, fs.OPEN_EXISTING,
fs.FILE_FLAG_OPEN_REPARSE_POINT | fs.FILE_FLAG_BACKUP_SEMANTICS,
None)
try:
if hlink == fs.INVALID_HANDLE_VALUE:
raise WinError(descr="Couldn't open directory to create junction")

srcvolpath = unparsed_convert(source)
(junctioninfo, infolen) = new_junction_reparse_buffer(srcvolpath)

dummy = DWORD(0)
res = DeviceIoControl(
hlink,
FSCTL_SET_REPARSE_POINT,
byref(junctioninfo),
infolen,
None,
0,
byref(dummy),
None)

if res == 0:
raise WinError(descr="Setting directory as junction failed")
success = True
finally:
if hlink != fs.INVALID_HANDLE_VALUE:
CloseHandle(hlink)
if not success:
os.rmdir(link_name)

def readlink(path):
# Make sure the path exists and is actually a junction
if not isjunction(path):
raise Exception("%s does not exist or is not a junction" % path)

hlink = CreateFile(path, fs.GENERIC_READ, fs.FILE_SHARE_READ, None,
fs.OPEN_EXISTING,
fs.FILE_FLAG_OPEN_REPARSE_POINT | fs.FILE_FLAG_BACKUP_SEMANTICS,
None)
if hlink == fs.INVALID_HANDLE_VALUE:
raise WinError(descr=("%s: couldn't open directory to read junction" % path))

try:
(junctioninfo, infolen) = new_junction_reparse_buffer()
dummy = DWORD(0)
res = DeviceIoControl(
hlink,
FSCTL_GET_REPARSE_POINT,
None,
0,
byref(junctioninfo),
infolen,
byref(dummy),
None)

if res == 0:
raise WinError(descr="Getting junction info failed")

return unparsed_unconvert(junctioninfo.SubstituteNameBuffer)
finally:
CloseHandle(hlink)

def unlink(path):
# Make sure the path exists and is actually a junction
if not isjunction(path):
raise Exception("%s does not exist or is not a junction" % path)
# Just get rid of the directory
os.rmdir(path)

0 comments on commit a51b2de

Please sign in to comment.